001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.commons.lang3.StringUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.filter.FilterBase;
029import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
030import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.util.AbstractHBaseTool;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.mapreduce.Counter;
035import org.apache.hadoop.mapreduce.Job;
036import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
042import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
043import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
044import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
045import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
046import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
048
049/**
050 * A job with a just a map phase to count rows. Map outputs table rows IF the input row has columns
051 * that have content.
052 */
053@InterfaceAudience.Public
054public class RowCounter extends AbstractHBaseTool {
055
056  private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class);
057
058  /** Name of this 'program'. */
059  static final String NAME = "rowcounter";
060
061  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
062  private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
063
064  private final static String OPT_START_TIME = "starttime";
065  private final static String OPT_END_TIME = "endtime";
066  private final static String OPT_RANGE = "range";
067  private final static String OPT_EXPECTED_COUNT = "expectedCount";
068
069  private String tableName;
070  private List<MultiRowRangeFilter.RowRange> rowRangeList;
071  private long startTime;
072  private long endTime;
073  private long expectedCount;
074  private List<String> columns = new ArrayList<>();
075
076  /**
077   * Mapper that runs the count.
078   */
079  static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {
080
081    /** Counter enumeration to count the actual rows. */
082    public static enum Counters {
083      ROWS
084    }
085
086    /**
087     * Maps the data.
088     * @param row     The current table row key.
089     * @param values  The columns.
090     * @param context The current context.
091     * @throws IOException When something is broken with the data.
092     * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context)
093     */
094    @Override
095    public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
096      // Count every row containing data, whether it's in qualifiers or values
097      context.getCounter(Counters.ROWS).increment(1);
098    }
099  }
100
101  /**
102   * Sets up the actual job.
103   * @param conf The current configuration.
104   * @return The newly created job.
105   * @throws IOException When setting up the job fails.
106   */
107  public Job createSubmittableJob(Configuration conf) throws IOException {
108    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
109    job.setJarByClass(RowCounter.class);
110    Scan scan = new Scan();
111    scan.setCacheBlocks(false);
112    setScanFilter(scan, rowRangeList);
113
114    for (String columnName : this.columns) {
115      String family = StringUtils.substringBefore(columnName, ":");
116      String qualifier = StringUtils.substringAfter(columnName, ":");
117      if (StringUtils.isBlank(qualifier)) {
118        scan.addFamily(Bytes.toBytes(family));
119      } else {
120        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
121      }
122    }
123
124    if (this.expectedCount >= 0) {
125      conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount);
126    }
127
128    scan.setTimeRange(startTime, endTime);
129    job.setOutputFormatClass(NullOutputFormat.class);
130    TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class,
131      ImmutableBytesWritable.class, Result.class, job);
132    job.setNumReduceTasks(0);
133    return job;
134  }
135
136  /**
137   * Sets up the actual job.
138   * @param conf The current configuration.
139   * @param args The command line parameters.
140   * @return The newly created job.
141   * @throws IOException When setting up the job fails.
142   * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead.
143   */
144  @Deprecated
145  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
146    String tableName = args[0];
147    List<MultiRowRangeFilter.RowRange> rowRangeList = null;
148    long startTime = 0;
149    long endTime = 0;
150
151    StringBuilder sb = new StringBuilder();
152
153    final String rangeSwitch = "--range=";
154    final String startTimeArgKey = "--starttime=";
155    final String endTimeArgKey = "--endtime=";
156    final String expectedCountArg = "--expected-count=";
157
158    // First argument is table name, starting from second
159    for (int i = 1; i < args.length; i++) {
160      if (args[i].startsWith(rangeSwitch)) {
161        try {
162          rowRangeList = parseRowRangeParameter(
163            args[i].substring(args[1].indexOf(rangeSwitch) + rangeSwitch.length()));
164        } catch (IllegalArgumentException e) {
165          return null;
166        }
167        continue;
168      }
169      if (args[i].startsWith(startTimeArgKey)) {
170        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
171        continue;
172      }
173      if (args[i].startsWith(endTimeArgKey)) {
174        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
175        continue;
176      }
177      if (args[i].startsWith(expectedCountArg)) {
178        conf.setLong(EXPECTED_COUNT_KEY,
179          Long.parseLong(args[i].substring(expectedCountArg.length())));
180        continue;
181      }
182      // if no switch, assume column names
183      sb.append(args[i]);
184      sb.append(" ");
185    }
186    if (endTime < startTime) {
187      printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
188      return null;
189    }
190
191    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
192    job.setJarByClass(RowCounter.class);
193    Scan scan = new Scan();
194    scan.setCacheBlocks(false);
195    setScanFilter(scan, rowRangeList);
196    if (sb.length() > 0) {
197      for (String columnName : sb.toString().trim().split(" ")) {
198        String family = StringUtils.substringBefore(columnName, ":");
199        String qualifier = StringUtils.substringAfter(columnName, ":");
200
201        if (StringUtils.isBlank(qualifier)) {
202          scan.addFamily(Bytes.toBytes(family));
203        } else {
204          scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
205        }
206      }
207    }
208    scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
209    job.setOutputFormatClass(NullOutputFormat.class);
210    TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class,
211      ImmutableBytesWritable.class, Result.class, job);
212    job.setNumReduceTasks(0);
213    return job;
214  }
215
216  /**
217   * Prints usage without error message. Note that we don't document --expected-count, because it's
218   * intended for test.
219   */
220  private static void printUsage(String errorMessage) {
221    System.err.println("ERROR: " + errorMessage);
222    System.err.println(
223      "Usage: hbase rowcounter [options] <tablename> " + "[--starttime=<start> --endtime=<end>] "
224        + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
225    System.err.println("For performance consider the following options:\n"
226      + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false");
227  }
228
229  private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) {
230    final List<String> rangesSplit = Splitter.on(";").splitToList(arg);
231    final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
232    for (String range : rangesSplit) {
233      if (range != null && !range.isEmpty()) {
234        List<String> startEnd = Splitter.on(",").splitToList(range);
235        if (startEnd.size() != 2 || startEnd.get(1).contains(",")) {
236          throw new IllegalArgumentException("Wrong range specification: " + range);
237        }
238        String startKey = startEnd.get(0);
239        String endKey = startEnd.get(1);
240        rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), true,
241          Bytes.toBytesBinary(endKey), false));
242      }
243    }
244    return rangeList;
245  }
246
247  /**
248   * Sets filter {@link FilterBase} to the {@link Scan} instance. If provided rowRangeList contains
249   * more than one element, method sets filter which is instance of {@link MultiRowRangeFilter}.
250   * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList
251   * contains exactly one element, startRow and stopRow are set to the scan.
252   */
253  private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
254    final int size = rowRangeList == null ? 0 : rowRangeList.size();
255    if (size <= 1) {
256      scan.setFilter(new FirstKeyOnlyFilter());
257    }
258    if (size == 1) {
259      MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
260      scan.setStartRow(range.getStartRow()); // inclusive
261      scan.setStopRow(range.getStopRow()); // exclusive
262    } else if (size > 1) {
263      scan.setFilter(new MultiRowRangeFilter(rowRangeList));
264    }
265  }
266
267  @Override
268  protected void printUsage() {
269    StringBuilder footerBuilder = new StringBuilder();
270    footerBuilder.append("For performance, consider the following configuration properties:\n");
271    footerBuilder.append("-Dhbase.client.scanner.caching=100\n");
272    footerBuilder.append("-Dmapreduce.map.speculative=false\n");
273    printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", "Options:",
274      footerBuilder.toString());
275  }
276
277  @Override
278  protected void printUsage(final String usageStr, final String usageHeader,
279    final String usageFooter) {
280    HelpFormatter helpFormatter = new HelpFormatter();
281    helpFormatter.setWidth(120);
282    helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator());
283    helpFormatter.setLongOptSeparator("=");
284    helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
285  }
286
287  @Override
288  protected void addOptions() {
289    Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true)
290      .desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build();
291    Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true)
292      .desc("end time filter limit, to only count rows up to this timestamp.").longOpt(OPT_END_TIME)
293      .build();
294    Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true)
295      .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build();
296    Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true)
297      .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build();
298    addOption(startTimeOption);
299    addOption(endTimeOption);
300    addOption(rangeOption);
301    addOption(expectedOption);
302  }
303
304  @Override
305  protected void processOptions(CommandLine cmd) throws IllegalArgumentException {
306    this.tableName = cmd.getArgList().get(0);
307    if (cmd.getOptionValue(OPT_RANGE) != null) {
308      this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE));
309    }
310    this.endTime = cmd.getOptionValue(OPT_END_TIME) == null
311      ? HConstants.LATEST_TIMESTAMP
312      : Long.parseLong(cmd.getOptionValue(OPT_END_TIME));
313    this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null
314      ? Long.MIN_VALUE
315      : Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT));
316    this.startTime = cmd.getOptionValue(OPT_START_TIME) == null
317      ? 0
318      : Long.parseLong(cmd.getOptionValue(OPT_START_TIME));
319
320    for (int i = 1; i < cmd.getArgList().size(); i++) {
321      String argument = cmd.getArgList().get(i);
322      if (!argument.startsWith("-")) {
323        this.columns.add(argument);
324      }
325    }
326
327    if (endTime < startTime) {
328      throw new IllegalArgumentException(
329        "--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
330    }
331  }
332
333  @Override
334  protected void processOldArgs(List<String> args) {
335    List<String> copiedArgs = new ArrayList<>(args);
336    args.removeAll(copiedArgs);
337    for (String arg : copiedArgs) {
338      if (arg.startsWith("-") && arg.contains("=")) {
339        String[] kv = arg.split("=");
340        args.add(kv[0]);
341        args.add(kv[1]);
342      } else {
343        args.add(arg);
344      }
345    }
346  }
347
348  @Override
349  protected int doWork() throws Exception {
350    Job job = createSubmittableJob(getConf());
351    if (job == null) {
352      return -1;
353    }
354    boolean success = job.waitForCompletion(true);
355    final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1);
356    if (success && expectedCount != -1) {
357      final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
358      success = expectedCount == counter.getValue();
359      if (!success) {
360        LOG.error("Failing job because count of '" + counter.getValue()
361          + "' does not match expected count of '" + expectedCount + "'");
362      }
363    }
364    return (success ? 0 : 1);
365  }
366
367  /**
368   * Main entry point.
369   * @param args The command line parameters.
370   * @throws Exception When running the job fails.
371   */
372  public static void main(String[] args) throws Exception {
373    new RowCounter().doStaticMain(args);
374  }
375
376  static class RowCounterCommandLineParser extends BasicParser {
377
378    @Override
379    protected void checkRequiredOptions() throws MissingOptionException {
380      if (this.cmd.getArgList().size() < 1 || this.cmd.getArgList().get(0).startsWith("-")) {
381        throw new MissingOptionException("First argument must be a valid table name.");
382      }
383    }
384  }
385
386  @Override
387  protected CommandLineParser newParser() {
388    return new RowCounterCommandLineParser();
389  }
390
391}