001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.mapreduce.JobUtil;
041import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
042import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
043import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
044import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.FSTableDescriptors;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.NullWritable;
052import org.apache.hadoop.io.Text;
053import org.apache.hadoop.mapreduce.InputSplit;
054import org.apache.hadoop.mapreduce.Job;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.apache.hadoop.mapreduce.Mapper;
057import org.apache.hadoop.mapreduce.lib.input.FileSplit;
058import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
059import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
060import org.apache.hadoop.mapreduce.security.TokenCache;
061import org.apache.hadoop.util.LineReader;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/*
069 * The CompactionTool allows to execute a compaction specifying a:
070 * <ul>
071 *  <li>table folder (all regions and families will be compacted)
072 *  <li>region folder (all families in the region will be compacted)
073 *  <li>family folder (the store files will be compacted)
074 * </ul>
075 */
076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
077public class CompactionTool extends Configured implements Tool {
078  private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
079
080  private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
081  private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
082  private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
083
084  /**
085   * Class responsible to execute the Compaction on the specified path. The path can be a table,
086   * region or family directory.
087   */
088  private static class CompactionWorker {
089    private final boolean deleteCompacted;
090    private final Configuration conf;
091    private final FileSystem fs;
092
093    public CompactionWorker(final FileSystem fs, final Configuration conf) {
094      this.conf = conf;
095      this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
096      this.fs = fs;
097    }
098
099    /**
100     * Execute the compaction on the specified path.
101     * @param path        Directory path on which to run compaction.
102     * @param compactOnce Execute just a single step of compaction.
103     * @param major       Request major compaction.
104     */
105    public void compact(final Path path, final boolean compactOnce, final boolean major)
106      throws IOException {
107      if (isFamilyDir(fs, path)) {
108        Path regionDir = path.getParent();
109        Path tableDir = regionDir.getParent();
110        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
111        RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
112        compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, major);
113      } else if (isRegionDir(fs, path)) {
114        Path tableDir = path.getParent();
115        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
116        compactRegion(tableDir, htd, path, compactOnce, major);
117      } else if (isTableDir(fs, path)) {
118        compactTable(path, compactOnce, major);
119      } else {
120        throw new IOException(
121          "Specified path is not a table, region or family directory. path=" + path);
122      }
123    }
124
125    private void compactTable(final Path tableDir, final boolean compactOnce, final boolean major)
126      throws IOException {
127      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir);
128      for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
129        compactRegion(tableDir, htd, regionDir, compactOnce, major);
130      }
131    }
132
133    private void compactRegion(final Path tableDir, final TableDescriptor htd, final Path regionDir,
134      final boolean compactOnce, final boolean major) throws IOException {
135      RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
136      for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
137        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), compactOnce, major);
138      }
139    }
140
141    /**
142     * Execute the actual compaction job. If the compact once flag is not specified, execute the
143     * compaction until no more compactions are needed. Uses the Configuration settings provided.
144     */
145    private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
146      final RegionInfo hri, final String familyName, final boolean compactOnce, final boolean major)
147      throws IOException {
148      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
149      LOG.info("Compact table=" + htd.getTableName() + " region=" + hri.getRegionNameAsString()
150        + " family=" + familyName);
151      if (major) {
152        store.triggerMajorCompaction();
153      }
154      do {
155        Optional<CompactionContext> compaction =
156          store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
157        if (!compaction.isPresent()) {
158          break;
159        }
160        List<HStoreFile> storeFiles =
161          store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
162        if (storeFiles != null && !storeFiles.isEmpty()) {
163          if (deleteCompacted) {
164            for (HStoreFile storeFile : storeFiles) {
165              fs.delete(storeFile.getPath(), false);
166            }
167          }
168        }
169      } while (store.needsCompaction() && !compactOnce);
170      // We need to close the store properly, to make sure it will archive compacted files
171      store.close();
172    }
173
174    private static HStore getStore(final Configuration conf, final FileSystem fs,
175      final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final String familyName)
176      throws IOException {
177      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
178      HRegion region = new HRegion(regionFs, null, conf, htd, null);
179      return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
180    }
181  }
182
183  private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
184    Path regionInfo = new Path(path, HRegionFileSystem.REGION_INFO_FILE);
185    return fs.exists(regionInfo);
186  }
187
188  private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
189    return FSTableDescriptors.isTableDir(fs, path);
190  }
191
192  private static boolean isFamilyDir(final FileSystem fs, final Path path) throws IOException {
193    return isRegionDir(fs, path.getParent());
194  }
195
196  private static class CompactionMapper
197    extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
198    private CompactionWorker compactor = null;
199    private boolean compactOnce = false;
200    private boolean major = false;
201
202    @Override
203    public void setup(Context context) {
204      Configuration conf = context.getConfiguration();
205      compactOnce = conf.getBoolean(CONF_COMPACT_ONCE, false);
206      major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
207
208      try {
209        FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
210        this.compactor = new CompactionWorker(fs, conf);
211      } catch (IOException e) {
212        throw new RuntimeException("Could not get the input FileSystem", e);
213      }
214      // Disable the MemStoreLAB as MemStore is not used by flow during compaction
215      conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
216    }
217
218    @Override
219    public void map(LongWritable key, Text value, Context context)
220      throws InterruptedException, IOException {
221      Path path = new Path(value.toString());
222      this.compactor.compact(path, compactOnce, major);
223    }
224  }
225
226  /**
227   * Input format that uses store files block location as input split locality.
228   */
229  private static class CompactionInputFormat extends TextInputFormat {
230    @Override
231    protected boolean isSplitable(JobContext context, Path file) {
232      return true;
233    }
234
235    /**
236     * Returns a split for each store files directory using the block location of each file as
237     * locality reference.
238     */
239    @Override
240    public List<InputSplit> getSplits(JobContext job) throws IOException {
241      List<InputSplit> splits = new ArrayList<>();
242      List<FileStatus> files = listStatus(job);
243
244      Text key = new Text();
245      for (FileStatus file : files) {
246        Path path = file.getPath();
247        FileSystem fs = path.getFileSystem(job.getConfiguration());
248        LineReader reader = new LineReader(fs.open(path));
249        long pos = 0;
250        int n;
251        try {
252          while ((n = reader.readLine(key)) > 0) {
253            String[] hosts = getStoreDirHosts(fs, path);
254            splits.add(new FileSplit(path, pos, n, hosts));
255            pos += n;
256          }
257        } finally {
258          reader.close();
259        }
260      }
261
262      return splits;
263    }
264
265    /**
266     * return the top hosts of the store files, used by the Split
267     */
268    private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
269      throws IOException {
270      FileStatus[] files = CommonFSUtils.listStatus(fs, path);
271      if (files == null) {
272        return new String[] {};
273      }
274
275      HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
276      for (FileStatus hfileStatus : files) {
277        HDFSBlocksDistribution storeFileBlocksDistribution =
278          FSUtils.computeHDFSBlocksDistribution(fs, hfileStatus, 0, hfileStatus.getLen());
279        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
280      }
281
282      List<String> hosts = hdfsBlocksDistribution.getTopHosts();
283      return hosts.toArray(new String[hosts.size()]);
284    }
285
286    /**
287     * Create the input file for the given directories to compact. The file is a TextFile with each
288     * line corrisponding to a store files directory to compact.
289     */
290    public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
291      final Path path, final Set<Path> toCompactDirs) throws IOException {
292      // Extract the list of store dirs
293      List<Path> storeDirs = new LinkedList<>();
294      for (Path compactDir : toCompactDirs) {
295        if (isFamilyDir(fs, compactDir)) {
296          storeDirs.add(compactDir);
297        } else if (isRegionDir(fs, compactDir)) {
298          storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
299        } else if (isTableDir(fs, compactDir)) {
300          // Lookup regions
301          for (Path regionDir : FSUtils.getRegionDirs(fs, compactDir)) {
302            storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
303          }
304        } else {
305          throw new IOException(
306            "Specified path is not a table, region or family directory. path=" + compactDir);
307        }
308      }
309
310      // Write Input File
311      FSDataOutputStream stream = stagingFs.create(path);
312      LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
313      try {
314        final byte[] newLine = Bytes.toBytes("\n");
315        for (Path storeDir : storeDirs) {
316          stream.write(Bytes.toBytes(storeDir.toString()));
317          stream.write(newLine);
318        }
319      } finally {
320        stream.close();
321      }
322      return storeDirs;
323    }
324  }
325
326  /**
327   * Execute compaction, using a Map-Reduce job.
328   */
329  private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
330    final boolean compactOnce, final boolean major) throws Exception {
331    Configuration conf = getConf();
332    conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
333    conf.setBoolean(CONF_COMPACT_MAJOR, major);
334
335    Job job = new Job(conf);
336    job.setJobName("CompactionTool");
337    job.setJarByClass(CompactionTool.class);
338    job.setMapperClass(CompactionMapper.class);
339    job.setInputFormatClass(CompactionInputFormat.class);
340    job.setOutputFormatClass(NullOutputFormat.class);
341    job.setMapSpeculativeExecution(false);
342    job.setNumReduceTasks(0);
343
344    // add dependencies (including HBase ones)
345    TableMapReduceUtil.addDependencyJars(job);
346
347    Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
348    FileSystem stagingFs = stagingDir.getFileSystem(conf);
349    try {
350      // Create input file with the store dirs
351      Path inputPath = new Path(stagingDir, "compact-" + EnvironmentEdgeManager.currentTime());
352      List<Path> storeDirs =
353        CompactionInputFormat.createInputFile(fs, stagingFs, inputPath, toCompactDirs);
354      CompactionInputFormat.addInputPath(job, inputPath);
355
356      // Initialize credential for secure cluster
357      TableMapReduceUtil.initCredentials(job);
358      // Despite the method name this will get delegation token for the filesystem
359      TokenCache.obtainTokensForNamenodes(job.getCredentials(), storeDirs.toArray(new Path[0]),
360        conf);
361
362      // Start the MR Job and wait
363      return job.waitForCompletion(true) ? 0 : 1;
364    } finally {
365      fs.delete(stagingDir, true);
366    }
367  }
368
369  /**
370   * Execute compaction, from this client, one path at the time.
371   */
372  private int doClient(final FileSystem fs, final Set<Path> toCompactDirs,
373    final boolean compactOnce, final boolean major) throws IOException {
374    // Disable the MemStoreLAB as MemStore is not used by flow during compaction
375    getConf().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
376    CompactionWorker worker = new CompactionWorker(fs, getConf());
377    for (Path path : toCompactDirs) {
378      worker.compact(path, compactOnce, major);
379    }
380    return 0;
381  }
382
383  @Override
384  public int run(String[] args) throws Exception {
385    Set<Path> toCompactDirs = new HashSet<>();
386    boolean compactOnce = false;
387    boolean major = false;
388    boolean mapred = false;
389
390    Configuration conf = getConf();
391    FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
392
393    try {
394      for (int i = 0; i < args.length; ++i) {
395        String opt = args[i];
396        if (opt.equals("-compactOnce")) {
397          compactOnce = true;
398        } else if (opt.equals("-major")) {
399          major = true;
400        } else if (opt.equals("-mapred")) {
401          mapred = true;
402        } else if (!opt.startsWith("-")) {
403          Path path = new Path(opt);
404          FileStatus status = fs.getFileStatus(path);
405          if (!status.isDirectory()) {
406            printUsage("Specified path is not a directory. path=" + path);
407            return 1;
408          }
409          toCompactDirs.add(path);
410        } else {
411          printUsage();
412        }
413      }
414    } catch (Exception e) {
415      printUsage(e.getMessage());
416      return 1;
417    }
418
419    if (toCompactDirs.isEmpty()) {
420      printUsage("No directories to compact specified.");
421      return 1;
422    }
423
424    // Execute compaction!
425    if (mapred) {
426      return doMapReduce(fs, toCompactDirs, compactOnce, major);
427    } else {
428      return doClient(fs, toCompactDirs, compactOnce, major);
429    }
430  }
431
432  private void printUsage() {
433    printUsage(null);
434  }
435
436  private void printUsage(final String message) {
437    if (message != null && message.length() > 0) {
438      System.err.println(message);
439    }
440    System.err.println("Usage: java " + this.getClass().getName() + " \\");
441    System.err.println("  [-compactOnce] [-major] [-mapred] [-D<property=value>]* files...");
442    System.err.println();
443    System.err.println("Options:");
444    System.err.println(" mapred         Use MapReduce to run compaction.");
445    System.err.println(" compactOnce    Execute just one compaction step. (default: while needed)");
446    System.err.println(" major          Trigger major compaction.");
447    System.err.println();
448    System.err.println("Note: -D properties will be applied to the conf used. ");
449    System.err.println("For example: ");
450    System.err
451      .println(" To stop delete of compacted file, pass -D" + CONF_DELETE_COMPACTED + "=false");
452    System.err.println();
453    System.err.println("Examples:");
454    System.err.println(" To compact the full 'TestTable' using MapReduce:");
455    System.err.println(
456      " $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
457    System.err.println();
458    System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
459    System.err.println(
460      " $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
461  }
462
463  public static void main(String[] args) throws Exception {
464    System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
465  }
466}