001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.client.Result;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.filter.Filter;
031import org.apache.hadoop.hbase.filter.FilterBase;
032import org.apache.hadoop.hbase.filter.FilterList;
033import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
034import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
035import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
036import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
037import org.apache.hadoop.hbase.util.AbstractHBaseTool;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
054
055/**
056 * A job with a just a map phase to count rows. Map outputs table rows IF the input row has columns
057 * that have content.
058 */
059@InterfaceAudience.Public
060public class RowCounter extends AbstractHBaseTool {
061
062  private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class);
063
064  /** Name of this 'program'. */
065  static final String NAME = "rowcounter";
066
067  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
068  private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
069
070  private final static String OPT_START_TIME = "starttime";
071  private final static String OPT_END_TIME = "endtime";
072  private final static String OPT_RANGE = "range";
073  private final static String OPT_EXPECTED_COUNT = "expectedCount";
074  private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers";
075
076  private String tableName;
077  private List<MultiRowRangeFilter.RowRange> rowRangeList;
078  private long startTime;
079  private long endTime;
080  private long expectedCount;
081  private boolean countDeleteMarkers;
082  private List<String> columns = new ArrayList<>();
083
084  private Job job;
085
086  /**
087   * Mapper that runs the count.
088   */
089  static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {
090
091    /** Counter enumeration to count the actual rows, cells and delete markers. */
092    public static enum Counters {
093      ROWS,
094      DELETE,
095      DELETE_COLUMN,
096      DELETE_FAMILY,
097      DELETE_FAMILY_VERSION,
098      ROWS_WITH_DELETE_MARKER
099    }
100
101    private boolean countDeleteMarkers;
102
103    @Override
104    protected void
105      setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context)
106        throws IOException, InterruptedException {
107      Configuration conf = context.getConfiguration();
108      countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false);
109    }
110
111    /**
112     * Maps the data.
113     * @param row     The current table row key.
114     * @param values  The columns.
115     * @param context The current context.
116     * @throws IOException When something is broken with the data.
117     * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context)
118     */
119    @Override
120    public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
121      // Count every row containing data, whether it's in qualifiers or values
122      context.getCounter(Counters.ROWS).increment(1);
123
124      if (countDeleteMarkers) {
125        boolean rowContainsDeleteMarker = false;
126        for (Cell cell : values.rawCells()) {
127          Cell.Type type = cell.getType();
128          switch (type) {
129            case Delete:
130              rowContainsDeleteMarker = true;
131              context.getCounter(Counters.DELETE).increment(1);
132              break;
133            case DeleteColumn:
134              rowContainsDeleteMarker = true;
135              context.getCounter(Counters.DELETE_COLUMN).increment(1);
136              break;
137            case DeleteFamily:
138              rowContainsDeleteMarker = true;
139              context.getCounter(Counters.DELETE_FAMILY).increment(1);
140              break;
141            case DeleteFamilyVersion:
142              rowContainsDeleteMarker = true;
143              context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1);
144              break;
145            default:
146              break;
147          }
148        }
149
150        if (rowContainsDeleteMarker) {
151          context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1);
152        }
153      }
154    }
155  }
156
157  /**
158   * Sets up the actual job.
159   * @param conf The current configuration.
160   * @return The newly created job.
161   * @throws IOException When setting up the job fails.
162   */
163  public Job createSubmittableJob(Configuration conf) throws IOException {
164    conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers);
165    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
166    job.setJarByClass(RowCounter.class);
167    Scan scan = new Scan();
168    // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
169    scan.setRaw(this.countDeleteMarkers);
170    scan.setCacheBlocks(false);
171    setScanFilter(scan, rowRangeList, this.countDeleteMarkers);
172
173    for (String columnName : this.columns) {
174      String family = StringUtils.substringBefore(columnName, ":");
175      String qualifier = StringUtils.substringAfter(columnName, ":");
176      if (StringUtils.isBlank(qualifier)) {
177        scan.addFamily(Bytes.toBytes(family));
178      } else {
179        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
180      }
181    }
182
183    if (this.expectedCount >= 0) {
184      conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount);
185    }
186
187    scan.setTimeRange(startTime, endTime);
188    job.setOutputFormatClass(NullOutputFormat.class);
189    TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class,
190      ImmutableBytesWritable.class, Result.class, job);
191    job.setNumReduceTasks(0);
192    return job;
193  }
194
195  /**
196   * Sets up the actual job.
197   * @param conf The current configuration.
198   * @param args The command line parameters.
199   * @return The newly created job.
200   * @throws IOException When setting up the job fails.
201   * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead.
202   */
203  @Deprecated
204  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
205    String tableName = args[0];
206    List<MultiRowRangeFilter.RowRange> rowRangeList = null;
207    long startTime = 0;
208    long endTime = 0;
209    boolean countDeleteMarkers = false;
210
211    StringBuilder sb = new StringBuilder();
212
213    final String rangeSwitch = "--range=";
214    final String startTimeArgKey = "--starttime=";
215    final String endTimeArgKey = "--endtime=";
216    final String expectedCountArg = "--expected-count=";
217    final String countDeleteMarkersArg = "--countDeleteMarkers";
218
219    // First argument is table name, starting from second
220    for (int i = 1; i < args.length; i++) {
221      if (args[i].startsWith(rangeSwitch)) {
222        try {
223          rowRangeList = parseRowRangeParameter(
224            args[i].substring(args[1].indexOf(rangeSwitch) + rangeSwitch.length()));
225        } catch (IllegalArgumentException e) {
226          return null;
227        }
228        continue;
229      }
230      if (args[i].startsWith(startTimeArgKey)) {
231        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
232        continue;
233      }
234      if (args[i].startsWith(endTimeArgKey)) {
235        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
236        continue;
237      }
238      if (args[i].startsWith(expectedCountArg)) {
239        conf.setLong(EXPECTED_COUNT_KEY,
240          Long.parseLong(args[i].substring(expectedCountArg.length())));
241        continue;
242      }
243      if (args[i].startsWith(countDeleteMarkersArg)) {
244        countDeleteMarkers = true;
245        continue;
246      }
247      // if no switch, assume column names
248      sb.append(args[i]);
249      sb.append(" ");
250    }
251    conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers);
252    if (endTime < startTime) {
253      printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
254      return null;
255    }
256
257    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
258    job.setJarByClass(RowCounter.class);
259    Scan scan = new Scan();
260    scan.setCacheBlocks(false);
261    // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
262    scan.setRaw(countDeleteMarkers);
263    setScanFilter(scan, rowRangeList, countDeleteMarkers);
264    if (sb.length() > 0) {
265      for (String columnName : sb.toString().trim().split(" ")) {
266        String family = StringUtils.substringBefore(columnName, ":");
267        String qualifier = StringUtils.substringAfter(columnName, ":");
268
269        if (StringUtils.isBlank(qualifier)) {
270          scan.addFamily(Bytes.toBytes(family));
271        } else {
272          scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
273        }
274      }
275    }
276    scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
277    job.setOutputFormatClass(NullOutputFormat.class);
278    TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class,
279      ImmutableBytesWritable.class, Result.class, job);
280    job.setNumReduceTasks(0);
281    return job;
282  }
283
284  /**
285   * Prints usage without error message. Note that we don't document --expected-count, because it's
286   * intended for test.
287   */
288  private static void printUsage(String errorMessage) {
289    System.err.println("ERROR: " + errorMessage);
290    System.err.println(
291      "Usage: hbase rowcounter [options] <tablename> " + "[--starttime=<start> --endtime=<end>] "
292        + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
293    System.err.println("For performance consider the following options:\n"
294      + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false");
295  }
296
297  private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) {
298    final List<String> rangesSplit = Splitter.on(";").splitToList(arg);
299    final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
300    for (String range : rangesSplit) {
301      if (range != null && !range.isEmpty()) {
302        List<String> startEnd = Splitter.on(",").splitToList(range);
303        if (startEnd.size() != 2 || startEnd.get(1).contains(",")) {
304          throw new IllegalArgumentException("Wrong range specification: " + range);
305        }
306        String startKey = startEnd.get(0);
307        String endKey = startEnd.get(1);
308        rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), true,
309          Bytes.toBytesBinary(endKey), false));
310      }
311    }
312    return rangeList;
313  }
314
315  /**
316   * Sets filter {@link FilterBase} to the {@link Scan} instance. If provided rowRangeList contains
317   * more than one element, method sets filter which is instance of {@link MultiRowRangeFilter}. If
318   * rowRangeList contains exactly one element, startRow and stopRow are set to the scan. Also,
319   * method may apply {@link FirstKeyOnlyFilter} an {@link KeyOnlyFilter} for better performance.
320   */
321  private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList,
322    boolean countDeleteMarkers) {
323    List<Filter> filters = new ArrayList<>();
324
325    // Apply filters for better performance.
326    if (!countDeleteMarkers) {
327      // We only need one cell per row, unless --countDeleteMarkers flag is set.
328      filters.add(new FirstKeyOnlyFilter());
329
330      // We're not interested in values. Use KeyOnlyFilter.
331      // NOTE: Logically, KeyOnlyFilter should be okay for --countDeleteMarkers, because it should
332      // only empty the values, but currently, having a filter changes the behavior of a raw scan.
333      // So we don't use it. See {@link UserScanQueryMatcher#mergeFilterResponse} for more details.
334      filters.add(new KeyOnlyFilter());
335    }
336
337    // Depending on the number of ranges, set start/stop row or apply MultiRowRangeFilter.
338    final int size = rowRangeList == null ? 0 : rowRangeList.size();
339    if (size == 1) {
340      MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
341      scan.withStartRow(range.getStartRow()); // inclusive
342      scan.withStopRow(range.getStopRow()); // exclusive
343    } else if (size > 1) {
344      filters.add(new MultiRowRangeFilter(rowRangeList));
345    }
346
347    if (filters.isEmpty()) {
348      return;
349    }
350    scan.setFilter(filters.size() == 1 ? filters.get(0) : new FilterList(filters));
351  }
352
353  @Override
354  protected void printUsage() {
355    StringBuilder footerBuilder = new StringBuilder();
356    footerBuilder.append("For performance, consider the following configuration properties:\n");
357    footerBuilder.append("-Dhbase.client.scanner.caching=100\n");
358    footerBuilder.append("-Dmapreduce.map.speculative=false\n");
359    printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", "Options:",
360      footerBuilder.toString());
361  }
362
363  @Override
364  protected void printUsage(final String usageStr, final String usageHeader,
365    final String usageFooter) {
366    HelpFormatter helpFormatter = new HelpFormatter();
367    helpFormatter.setWidth(120);
368    helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator());
369    helpFormatter.setLongOptSeparator("=");
370    helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
371  }
372
373  @Override
374  protected void addOptions() {
375    Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true)
376      .desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build();
377    Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true)
378      .desc("end time filter limit, to only count rows up to this timestamp.").longOpt(OPT_END_TIME)
379      .build();
380    Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true)
381      .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build();
382    Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true)
383      .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build();
384    Option countDeleteMarkersOption = Option.builder(null).hasArg(false)
385      .desc("counts the number of Delete Markers of all types, i.e. "
386        + "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)")
387      .longOpt(OPT_COUNT_DELETE_MARKERS).build();
388    addOption(startTimeOption);
389    addOption(endTimeOption);
390    addOption(rangeOption);
391    addOption(expectedOption);
392    addOption(countDeleteMarkersOption);
393  }
394
395  @Override
396  protected void processOptions(CommandLine cmd) throws IllegalArgumentException {
397    this.tableName = cmd.getArgList().get(0);
398    if (cmd.getOptionValue(OPT_RANGE) != null) {
399      this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE));
400    }
401    this.endTime = cmd.getOptionValue(OPT_END_TIME) == null
402      ? HConstants.LATEST_TIMESTAMP
403      : Long.parseLong(cmd.getOptionValue(OPT_END_TIME));
404    this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null
405      ? Long.MIN_VALUE
406      : Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT));
407    this.startTime = cmd.getOptionValue(OPT_START_TIME) == null
408      ? 0
409      : Long.parseLong(cmd.getOptionValue(OPT_START_TIME));
410    this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS);
411
412    for (int i = 1; i < cmd.getArgList().size(); i++) {
413      String argument = cmd.getArgList().get(i);
414      if (!argument.startsWith("-")) {
415        this.columns.add(argument);
416      }
417    }
418
419    if (endTime < startTime) {
420      throw new IllegalArgumentException(
421        "--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
422    }
423  }
424
425  @Override
426  protected void processOldArgs(List<String> args) {
427    List<String> copiedArgs = new ArrayList<>(args);
428    args.removeAll(copiedArgs);
429    for (String arg : copiedArgs) {
430      if (arg.startsWith("-") && arg.contains("=")) {
431        String[] kv = arg.split("=");
432        args.add(kv[0]);
433        args.add(kv[1]);
434      } else {
435        args.add(arg);
436      }
437    }
438  }
439
440  @Override
441  protected int doWork() throws Exception {
442    job = createSubmittableJob(getConf());
443    if (job == null) {
444      return -1;
445    }
446    boolean success = job.waitForCompletion(true);
447    final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1);
448    if (success && expectedCount != -1) {
449      final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
450      success = expectedCount == counter.getValue();
451      if (!success) {
452        LOG.error("Failing job because count of '" + counter.getValue()
453          + "' does not match expected count of '" + expectedCount + "'");
454      }
455    }
456    return (success ? 0 : 1);
457  }
458
459  /**
460   * Main entry point.
461   * @param args The command line parameters.
462   * @throws Exception When running the job fails.
463   */
464  public static void main(String[] args) throws Exception {
465    new RowCounter().doStaticMain(args);
466  }
467
468  static class RowCounterCommandLineParser extends BasicParser {
469
470    @Override
471    protected void checkRequiredOptions() throws MissingOptionException {
472      if (this.cmd.getArgList().size() < 1 || this.cmd.getArgList().get(0).startsWith("-")) {
473        throw new MissingOptionException("First argument must be a valid table name.");
474      }
475    }
476  }
477
478  @Override
479  protected CommandLineParser newParser() {
480    return new RowCounterCommandLineParser();
481  }
482
483  @RestrictedApi(explanation = "Only visible for testing", link = "",
484      allowedOnPath = ".*/src/test/.*")
485  Job getMapReduceJob() {
486    return job;
487  }
488
489}