001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.filter.Filter; 031import org.apache.hadoop.hbase.filter.FilterBase; 032import org.apache.hadoop.hbase.filter.FilterList; 033import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 034import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 035import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; 036import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 037import org.apache.hadoop.hbase.util.AbstractHBaseTool; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.mapreduce.Counter; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.mapreduce.Mapper; 042import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 048import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 051import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 052import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 054 055/** 056 * A job with a just a map phase to count rows. Map outputs table rows IF the input row has columns 057 * that have content. 058 */ 059@InterfaceAudience.Public 060public class RowCounter extends AbstractHBaseTool { 061 062 private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); 063 064 /** Name of this 'program'. */ 065 static final String NAME = "rowcounter"; 066 067 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 068 private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; 069 070 private final static String OPT_START_TIME = "starttime"; 071 private final static String OPT_END_TIME = "endtime"; 072 private final static String OPT_RANGE = "range"; 073 private final static String OPT_EXPECTED_COUNT = "expectedCount"; 074 private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers"; 075 076 private String tableName; 077 private List<MultiRowRangeFilter.RowRange> rowRangeList; 078 private long startTime; 079 private long endTime; 080 private long expectedCount; 081 private boolean countDeleteMarkers; 082 private List<String> columns = new ArrayList<>(); 083 084 private Job job; 085 086 /** 087 * Mapper that runs the count. 088 */ 089 static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> { 090 091 /** Counter enumeration to count the actual rows, cells and delete markers. */ 092 public static enum Counters { 093 ROWS, 094 DELETE, 095 DELETE_COLUMN, 096 DELETE_FAMILY, 097 DELETE_FAMILY_VERSION, 098 ROWS_WITH_DELETE_MARKER 099 } 100 101 private boolean countDeleteMarkers; 102 103 @Override 104 protected void 105 setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context) 106 throws IOException, InterruptedException { 107 Configuration conf = context.getConfiguration(); 108 countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false); 109 } 110 111 /** 112 * Maps the data. 113 * @param row The current table row key. 114 * @param values The columns. 115 * @param context The current context. 116 * @throws IOException When something is broken with the data. 117 * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) 118 */ 119 @Override 120 public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { 121 // Count every row containing data, whether it's in qualifiers or values 122 context.getCounter(Counters.ROWS).increment(1); 123 124 if (countDeleteMarkers) { 125 boolean rowContainsDeleteMarker = false; 126 for (Cell cell : values.rawCells()) { 127 Cell.Type type = cell.getType(); 128 switch (type) { 129 case Delete: 130 rowContainsDeleteMarker = true; 131 context.getCounter(Counters.DELETE).increment(1); 132 break; 133 case DeleteColumn: 134 rowContainsDeleteMarker = true; 135 context.getCounter(Counters.DELETE_COLUMN).increment(1); 136 break; 137 case DeleteFamily: 138 rowContainsDeleteMarker = true; 139 context.getCounter(Counters.DELETE_FAMILY).increment(1); 140 break; 141 case DeleteFamilyVersion: 142 rowContainsDeleteMarker = true; 143 context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1); 144 break; 145 default: 146 break; 147 } 148 } 149 150 if (rowContainsDeleteMarker) { 151 context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1); 152 } 153 } 154 } 155 } 156 157 /** 158 * Sets up the actual job. 159 * @param conf The current configuration. 160 * @return The newly created job. 161 * @throws IOException When setting up the job fails. 162 */ 163 public Job createSubmittableJob(Configuration conf) throws IOException { 164 conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers); 165 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 166 job.setJarByClass(RowCounter.class); 167 Scan scan = new Scan(); 168 // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set 169 scan.setRaw(this.countDeleteMarkers); 170 scan.setCacheBlocks(false); 171 setScanFilter(scan, rowRangeList, this.countDeleteMarkers); 172 173 for (String columnName : this.columns) { 174 String family = StringUtils.substringBefore(columnName, ":"); 175 String qualifier = StringUtils.substringAfter(columnName, ":"); 176 if (StringUtils.isBlank(qualifier)) { 177 scan.addFamily(Bytes.toBytes(family)); 178 } else { 179 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 180 } 181 } 182 183 if (this.expectedCount >= 0) { 184 conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount); 185 } 186 187 scan.setTimeRange(startTime, endTime); 188 job.setOutputFormatClass(NullOutputFormat.class); 189 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 190 ImmutableBytesWritable.class, Result.class, job); 191 job.setNumReduceTasks(0); 192 return job; 193 } 194 195 /** 196 * Sets up the actual job. 197 * @param conf The current configuration. 198 * @param args The command line parameters. 199 * @return The newly created job. 200 * @throws IOException When setting up the job fails. 201 * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead. 202 */ 203 @Deprecated 204 public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 205 String tableName = args[0]; 206 List<MultiRowRangeFilter.RowRange> rowRangeList = null; 207 long startTime = 0; 208 long endTime = 0; 209 boolean countDeleteMarkers = false; 210 211 StringBuilder sb = new StringBuilder(); 212 213 final String rangeSwitch = "--range="; 214 final String startTimeArgKey = "--starttime="; 215 final String endTimeArgKey = "--endtime="; 216 final String expectedCountArg = "--expected-count="; 217 final String countDeleteMarkersArg = "--countDeleteMarkers"; 218 219 // First argument is table name, starting from second 220 for (int i = 1; i < args.length; i++) { 221 if (args[i].startsWith(rangeSwitch)) { 222 try { 223 rowRangeList = parseRowRangeParameter( 224 args[i].substring(args[1].indexOf(rangeSwitch) + rangeSwitch.length())); 225 } catch (IllegalArgumentException e) { 226 return null; 227 } 228 continue; 229 } 230 if (args[i].startsWith(startTimeArgKey)) { 231 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); 232 continue; 233 } 234 if (args[i].startsWith(endTimeArgKey)) { 235 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); 236 continue; 237 } 238 if (args[i].startsWith(expectedCountArg)) { 239 conf.setLong(EXPECTED_COUNT_KEY, 240 Long.parseLong(args[i].substring(expectedCountArg.length()))); 241 continue; 242 } 243 if (args[i].startsWith(countDeleteMarkersArg)) { 244 countDeleteMarkers = true; 245 continue; 246 } 247 // if no switch, assume column names 248 sb.append(args[i]); 249 sb.append(" "); 250 } 251 conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers); 252 if (endTime < startTime) { 253 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 254 return null; 255 } 256 257 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 258 job.setJarByClass(RowCounter.class); 259 Scan scan = new Scan(); 260 scan.setCacheBlocks(false); 261 // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set 262 scan.setRaw(countDeleteMarkers); 263 setScanFilter(scan, rowRangeList, countDeleteMarkers); 264 if (sb.length() > 0) { 265 for (String columnName : sb.toString().trim().split(" ")) { 266 String family = StringUtils.substringBefore(columnName, ":"); 267 String qualifier = StringUtils.substringAfter(columnName, ":"); 268 269 if (StringUtils.isBlank(qualifier)) { 270 scan.addFamily(Bytes.toBytes(family)); 271 } else { 272 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 273 } 274 } 275 } 276 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 277 job.setOutputFormatClass(NullOutputFormat.class); 278 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 279 ImmutableBytesWritable.class, Result.class, job); 280 job.setNumReduceTasks(0); 281 return job; 282 } 283 284 /** 285 * Prints usage without error message. Note that we don't document --expected-count, because it's 286 * intended for test. 287 */ 288 private static void printUsage(String errorMessage) { 289 System.err.println("ERROR: " + errorMessage); 290 System.err.println( 291 "Usage: hbase rowcounter [options] <tablename> " + "[--starttime=<start> --endtime=<end>] " 292 + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); 293 System.err.println("For performance consider the following options:\n" 294 + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false"); 295 } 296 297 private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) { 298 final List<String> rangesSplit = Splitter.on(";").splitToList(arg); 299 final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); 300 for (String range : rangesSplit) { 301 if (range != null && !range.isEmpty()) { 302 List<String> startEnd = Splitter.on(",").splitToList(range); 303 if (startEnd.size() != 2 || startEnd.get(1).contains(",")) { 304 throw new IllegalArgumentException("Wrong range specification: " + range); 305 } 306 String startKey = startEnd.get(0); 307 String endKey = startEnd.get(1); 308 rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), true, 309 Bytes.toBytesBinary(endKey), false)); 310 } 311 } 312 return rangeList; 313 } 314 315 /** 316 * Sets filter {@link FilterBase} to the {@link Scan} instance. If provided rowRangeList contains 317 * more than one element, method sets filter which is instance of {@link MultiRowRangeFilter}. If 318 * rowRangeList contains exactly one element, startRow and stopRow are set to the scan. Also, 319 * method may apply {@link FirstKeyOnlyFilter} an {@link KeyOnlyFilter} for better performance. 320 */ 321 private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList, 322 boolean countDeleteMarkers) { 323 List<Filter> filters = new ArrayList<>(); 324 325 // Apply filters for better performance. 326 if (!countDeleteMarkers) { 327 // We only need one cell per row, unless --countDeleteMarkers flag is set. 328 filters.add(new FirstKeyOnlyFilter()); 329 330 // We're not interested in values. Use KeyOnlyFilter. 331 // NOTE: Logically, KeyOnlyFilter should be okay for --countDeleteMarkers, because it should 332 // only empty the values, but currently, having a filter changes the behavior of a raw scan. 333 // So we don't use it. See {@link UserScanQueryMatcher#mergeFilterResponse} for more details. 334 filters.add(new KeyOnlyFilter()); 335 } 336 337 // Depending on the number of ranges, set start/stop row or apply MultiRowRangeFilter. 338 final int size = rowRangeList == null ? 0 : rowRangeList.size(); 339 if (size == 1) { 340 MultiRowRangeFilter.RowRange range = rowRangeList.get(0); 341 scan.withStartRow(range.getStartRow()); // inclusive 342 scan.withStopRow(range.getStopRow()); // exclusive 343 } else if (size > 1) { 344 filters.add(new MultiRowRangeFilter(rowRangeList)); 345 } 346 347 if (filters.isEmpty()) { 348 return; 349 } 350 scan.setFilter(filters.size() == 1 ? filters.get(0) : new FilterList(filters)); 351 } 352 353 @Override 354 protected void printUsage() { 355 StringBuilder footerBuilder = new StringBuilder(); 356 footerBuilder.append("For performance, consider the following configuration properties:\n"); 357 footerBuilder.append("-Dhbase.client.scanner.caching=100\n"); 358 footerBuilder.append("-Dmapreduce.map.speculative=false\n"); 359 printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", "Options:", 360 footerBuilder.toString()); 361 } 362 363 @Override 364 protected void printUsage(final String usageStr, final String usageHeader, 365 final String usageFooter) { 366 HelpFormatter helpFormatter = new HelpFormatter(); 367 helpFormatter.setWidth(120); 368 helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator()); 369 helpFormatter.setLongOptSeparator("="); 370 helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); 371 } 372 373 @Override 374 protected void addOptions() { 375 Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 376 .desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build(); 377 Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 378 .desc("end time filter limit, to only count rows up to this timestamp.").longOpt(OPT_END_TIME) 379 .build(); 380 Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true) 381 .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); 382 Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true) 383 .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); 384 Option countDeleteMarkersOption = Option.builder(null).hasArg(false) 385 .desc("counts the number of Delete Markers of all types, i.e. " 386 + "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)") 387 .longOpt(OPT_COUNT_DELETE_MARKERS).build(); 388 addOption(startTimeOption); 389 addOption(endTimeOption); 390 addOption(rangeOption); 391 addOption(expectedOption); 392 addOption(countDeleteMarkersOption); 393 } 394 395 @Override 396 protected void processOptions(CommandLine cmd) throws IllegalArgumentException { 397 this.tableName = cmd.getArgList().get(0); 398 if (cmd.getOptionValue(OPT_RANGE) != null) { 399 this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE)); 400 } 401 this.endTime = cmd.getOptionValue(OPT_END_TIME) == null 402 ? HConstants.LATEST_TIMESTAMP 403 : Long.parseLong(cmd.getOptionValue(OPT_END_TIME)); 404 this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null 405 ? Long.MIN_VALUE 406 : Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT)); 407 this.startTime = cmd.getOptionValue(OPT_START_TIME) == null 408 ? 0 409 : Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); 410 this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS); 411 412 for (int i = 1; i < cmd.getArgList().size(); i++) { 413 String argument = cmd.getArgList().get(i); 414 if (!argument.startsWith("-")) { 415 this.columns.add(argument); 416 } 417 } 418 419 if (endTime < startTime) { 420 throw new IllegalArgumentException( 421 "--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 422 } 423 } 424 425 @Override 426 protected void processOldArgs(List<String> args) { 427 List<String> copiedArgs = new ArrayList<>(args); 428 args.removeAll(copiedArgs); 429 for (String arg : copiedArgs) { 430 if (arg.startsWith("-") && arg.contains("=")) { 431 String[] kv = arg.split("="); 432 args.add(kv[0]); 433 args.add(kv[1]); 434 } else { 435 args.add(arg); 436 } 437 } 438 } 439 440 @Override 441 protected int doWork() throws Exception { 442 job = createSubmittableJob(getConf()); 443 if (job == null) { 444 return -1; 445 } 446 boolean success = job.waitForCompletion(true); 447 final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); 448 if (success && expectedCount != -1) { 449 final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); 450 success = expectedCount == counter.getValue(); 451 if (!success) { 452 LOG.error("Failing job because count of '" + counter.getValue() 453 + "' does not match expected count of '" + expectedCount + "'"); 454 } 455 } 456 return (success ? 0 : 1); 457 } 458 459 /** 460 * Main entry point. 461 * @param args The command line parameters. 462 * @throws Exception When running the job fails. 463 */ 464 public static void main(String[] args) throws Exception { 465 new RowCounter().doStaticMain(args); 466 } 467 468 static class RowCounterCommandLineParser extends BasicParser { 469 470 @Override 471 protected void checkRequiredOptions() throws MissingOptionException { 472 if (this.cmd.getArgList().size() < 1 || this.cmd.getArgList().get(0).startsWith("-")) { 473 throw new MissingOptionException("First argument must be a valid table name."); 474 } 475 } 476 } 477 478 @Override 479 protected CommandLineParser newParser() { 480 return new RowCounterCommandLineParser(); 481 } 482 483 @RestrictedApi(explanation = "Only visible for testing", link = "", 484 allowedOnPath = ".*/src/test/.*") 485 Job getMapReduceJob() { 486 return job; 487 } 488 489}