001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.filter.FilterBase; 031import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 032import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; 033import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 034import org.apache.hadoop.hbase.util.AbstractHBaseTool; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.mapreduce.Counter; 037import org.apache.hadoop.mapreduce.Job; 038import org.apache.hadoop.mapreduce.Mapper; 039import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 045import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; 046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 048import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 051 052/** 053 * A job with a just a map phase to count rows. Map outputs table rows IF the input row has columns 054 * that have content. 055 */ 056@InterfaceAudience.Public 057public class RowCounter extends AbstractHBaseTool { 058 059 private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); 060 061 /** Name of this 'program'. */ 062 static final String NAME = "rowcounter"; 063 064 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 065 private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; 066 067 private final static String OPT_START_TIME = "starttime"; 068 private final static String OPT_END_TIME = "endtime"; 069 private final static String OPT_RANGE = "range"; 070 private final static String OPT_EXPECTED_COUNT = "expectedCount"; 071 private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers"; 072 073 private String tableName; 074 private List<MultiRowRangeFilter.RowRange> rowRangeList; 075 private long startTime; 076 private long endTime; 077 private long expectedCount; 078 private boolean countDeleteMarkers; 079 private List<String> columns = new ArrayList<>(); 080 081 private Job job; 082 083 /** 084 * Mapper that runs the count. 085 */ 086 static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> { 087 088 /** Counter enumeration to count the actual rows, cells and delete markers. */ 089 public static enum Counters { 090 ROWS, 091 DELETE, 092 DELETE_COLUMN, 093 DELETE_FAMILY, 094 DELETE_FAMILY_VERSION, 095 ROWS_WITH_DELETE_MARKER 096 } 097 098 private boolean countDeleteMarkers; 099 100 @Override 101 protected void 102 setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context) 103 throws IOException, InterruptedException { 104 Configuration conf = context.getConfiguration(); 105 countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false); 106 } 107 108 /** 109 * Maps the data. 110 * @param row The current table row key. 111 * @param values The columns. 112 * @param context The current context. 113 * @throws IOException When something is broken with the data. 114 * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) 115 */ 116 @Override 117 public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { 118 // Count every row containing data, whether it's in qualifiers or values 119 context.getCounter(Counters.ROWS).increment(1); 120 121 if (countDeleteMarkers) { 122 boolean rowContainsDeleteMarker = false; 123 for (Cell cell : values.rawCells()) { 124 Cell.Type type = cell.getType(); 125 switch (type) { 126 case Delete: 127 rowContainsDeleteMarker = true; 128 context.getCounter(Counters.DELETE).increment(1); 129 break; 130 case DeleteColumn: 131 rowContainsDeleteMarker = true; 132 context.getCounter(Counters.DELETE_COLUMN).increment(1); 133 break; 134 case DeleteFamily: 135 rowContainsDeleteMarker = true; 136 context.getCounter(Counters.DELETE_FAMILY).increment(1); 137 break; 138 case DeleteFamilyVersion: 139 rowContainsDeleteMarker = true; 140 context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1); 141 break; 142 default: 143 break; 144 } 145 } 146 147 if (rowContainsDeleteMarker) { 148 context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1); 149 } 150 } 151 } 152 } 153 154 /** 155 * Sets up the actual job. 156 * @param conf The current configuration. 157 * @return The newly created job. 158 * @throws IOException When setting up the job fails. 159 */ 160 public Job createSubmittableJob(Configuration conf) throws IOException { 161 conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers); 162 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 163 job.setJarByClass(RowCounter.class); 164 Scan scan = new Scan(); 165 // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set 166 scan.setRaw(this.countDeleteMarkers); 167 scan.setCacheBlocks(false); 168 setScanFilter(scan, rowRangeList, this.countDeleteMarkers); 169 170 for (String columnName : this.columns) { 171 String family = StringUtils.substringBefore(columnName, ":"); 172 String qualifier = StringUtils.substringAfter(columnName, ":"); 173 if (StringUtils.isBlank(qualifier)) { 174 scan.addFamily(Bytes.toBytes(family)); 175 } else { 176 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 177 } 178 } 179 180 if (this.expectedCount >= 0) { 181 conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount); 182 } 183 184 scan.setTimeRange(startTime, endTime); 185 job.setOutputFormatClass(NullOutputFormat.class); 186 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 187 ImmutableBytesWritable.class, Result.class, job); 188 job.setNumReduceTasks(0); 189 return job; 190 } 191 192 /** 193 * Sets up the actual job. 194 * @param conf The current configuration. 195 * @param args The command line parameters. 196 * @return The newly created job. 197 * @throws IOException When setting up the job fails. 198 * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead. 199 */ 200 @Deprecated 201 public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 202 String tableName = args[0]; 203 List<MultiRowRangeFilter.RowRange> rowRangeList = null; 204 long startTime = 0; 205 long endTime = 0; 206 boolean countDeleteMarkers = false; 207 208 StringBuilder sb = new StringBuilder(); 209 210 final String rangeSwitch = "--range="; 211 final String startTimeArgKey = "--starttime="; 212 final String endTimeArgKey = "--endtime="; 213 final String expectedCountArg = "--expected-count="; 214 final String countDeleteMarkersArg = "--countDeleteMarkers"; 215 216 // First argument is table name, starting from second 217 for (int i = 1; i < args.length; i++) { 218 if (args[i].startsWith(rangeSwitch)) { 219 try { 220 rowRangeList = parseRowRangeParameter( 221 args[i].substring(args[1].indexOf(rangeSwitch) + rangeSwitch.length())); 222 } catch (IllegalArgumentException e) { 223 return null; 224 } 225 continue; 226 } 227 if (args[i].startsWith(startTimeArgKey)) { 228 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); 229 continue; 230 } 231 if (args[i].startsWith(endTimeArgKey)) { 232 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); 233 continue; 234 } 235 if (args[i].startsWith(expectedCountArg)) { 236 conf.setLong(EXPECTED_COUNT_KEY, 237 Long.parseLong(args[i].substring(expectedCountArg.length()))); 238 continue; 239 } 240 if (args[i].startsWith(countDeleteMarkersArg)) { 241 countDeleteMarkers = true; 242 continue; 243 } 244 // if no switch, assume column names 245 sb.append(args[i]); 246 sb.append(" "); 247 } 248 conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers); 249 if (endTime < startTime) { 250 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 251 return null; 252 } 253 254 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 255 job.setJarByClass(RowCounter.class); 256 Scan scan = new Scan(); 257 scan.setCacheBlocks(false); 258 // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set 259 scan.setRaw(countDeleteMarkers); 260 setScanFilter(scan, rowRangeList, countDeleteMarkers); 261 if (sb.length() > 0) { 262 for (String columnName : sb.toString().trim().split(" ")) { 263 String family = StringUtils.substringBefore(columnName, ":"); 264 String qualifier = StringUtils.substringAfter(columnName, ":"); 265 266 if (StringUtils.isBlank(qualifier)) { 267 scan.addFamily(Bytes.toBytes(family)); 268 } else { 269 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 270 } 271 } 272 } 273 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 274 job.setOutputFormatClass(NullOutputFormat.class); 275 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 276 ImmutableBytesWritable.class, Result.class, job); 277 job.setNumReduceTasks(0); 278 return job; 279 } 280 281 /** 282 * Prints usage without error message. Note that we don't document --expected-count, because it's 283 * intended for test. 284 */ 285 private static void printUsage(String errorMessage) { 286 System.err.println("ERROR: " + errorMessage); 287 System.err.println( 288 "Usage: hbase rowcounter [options] <tablename> " + "[--starttime=<start> --endtime=<end>] " 289 + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); 290 System.err.println("For performance consider the following options:\n" 291 + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false"); 292 } 293 294 private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) { 295 final List<String> rangesSplit = Splitter.on(";").splitToList(arg); 296 final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); 297 for (String range : rangesSplit) { 298 if (range != null && !range.isEmpty()) { 299 List<String> startEnd = Splitter.on(",").splitToList(range); 300 if (startEnd.size() != 2 || startEnd.get(1).contains(",")) { 301 throw new IllegalArgumentException("Wrong range specification: " + range); 302 } 303 String startKey = startEnd.get(0); 304 String endKey = startEnd.get(1); 305 rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), true, 306 Bytes.toBytesBinary(endKey), false)); 307 } 308 } 309 return rangeList; 310 } 311 312 /** 313 * Sets filter {@link FilterBase} to the {@link Scan} instance. If provided rowRangeList contains 314 * more than one element, method sets filter which is instance of {@link MultiRowRangeFilter}. 315 * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList 316 * contains exactly one element, startRow and stopRow are set to the scan. 317 */ 318 private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList, 319 boolean countDeleteMarkers) { 320 final int size = rowRangeList == null ? 0 : rowRangeList.size(); 321 // all cells will be needed if --countDeleteMarkers flag is set, hence, skipping filter 322 if (size <= 1 && !countDeleteMarkers) { 323 scan.setFilter(new FirstKeyOnlyFilter()); 324 } 325 if (size == 1) { 326 MultiRowRangeFilter.RowRange range = rowRangeList.get(0); 327 scan.withStartRow(range.getStartRow()); // inclusive 328 scan.withStopRow(range.getStopRow()); // exclusive 329 } else if (size > 1) { 330 scan.setFilter(new MultiRowRangeFilter(rowRangeList)); 331 } 332 } 333 334 @Override 335 protected void printUsage() { 336 StringBuilder footerBuilder = new StringBuilder(); 337 footerBuilder.append("For performance, consider the following configuration properties:\n"); 338 footerBuilder.append("-Dhbase.client.scanner.caching=100\n"); 339 footerBuilder.append("-Dmapreduce.map.speculative=false\n"); 340 printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", "Options:", 341 footerBuilder.toString()); 342 } 343 344 @Override 345 protected void printUsage(final String usageStr, final String usageHeader, 346 final String usageFooter) { 347 HelpFormatter helpFormatter = new HelpFormatter(); 348 helpFormatter.setWidth(120); 349 helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator()); 350 helpFormatter.setLongOptSeparator("="); 351 helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); 352 } 353 354 @Override 355 protected void addOptions() { 356 Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 357 .desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build(); 358 Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 359 .desc("end time filter limit, to only count rows up to this timestamp.").longOpt(OPT_END_TIME) 360 .build(); 361 Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true) 362 .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); 363 Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true) 364 .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); 365 Option countDeleteMarkersOption = Option.builder(null).hasArg(false) 366 .desc("counts the number of Delete Markers of all types, i.e. " 367 + "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)") 368 .longOpt(OPT_COUNT_DELETE_MARKERS).build(); 369 addOption(startTimeOption); 370 addOption(endTimeOption); 371 addOption(rangeOption); 372 addOption(expectedOption); 373 addOption(countDeleteMarkersOption); 374 } 375 376 @Override 377 protected void processOptions(CommandLine cmd) throws IllegalArgumentException { 378 this.tableName = cmd.getArgList().get(0); 379 if (cmd.getOptionValue(OPT_RANGE) != null) { 380 this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE)); 381 } 382 this.endTime = cmd.getOptionValue(OPT_END_TIME) == null 383 ? HConstants.LATEST_TIMESTAMP 384 : Long.parseLong(cmd.getOptionValue(OPT_END_TIME)); 385 this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null 386 ? Long.MIN_VALUE 387 : Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT)); 388 this.startTime = cmd.getOptionValue(OPT_START_TIME) == null 389 ? 0 390 : Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); 391 this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS); 392 393 for (int i = 1; i < cmd.getArgList().size(); i++) { 394 String argument = cmd.getArgList().get(i); 395 if (!argument.startsWith("-")) { 396 this.columns.add(argument); 397 } 398 } 399 400 if (endTime < startTime) { 401 throw new IllegalArgumentException( 402 "--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 403 } 404 } 405 406 @Override 407 protected void processOldArgs(List<String> args) { 408 List<String> copiedArgs = new ArrayList<>(args); 409 args.removeAll(copiedArgs); 410 for (String arg : copiedArgs) { 411 if (arg.startsWith("-") && arg.contains("=")) { 412 String[] kv = arg.split("="); 413 args.add(kv[0]); 414 args.add(kv[1]); 415 } else { 416 args.add(arg); 417 } 418 } 419 } 420 421 @Override 422 protected int doWork() throws Exception { 423 job = createSubmittableJob(getConf()); 424 if (job == null) { 425 return -1; 426 } 427 boolean success = job.waitForCompletion(true); 428 final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); 429 if (success && expectedCount != -1) { 430 final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); 431 success = expectedCount == counter.getValue(); 432 if (!success) { 433 LOG.error("Failing job because count of '" + counter.getValue() 434 + "' does not match expected count of '" + expectedCount + "'"); 435 } 436 } 437 return (success ? 0 : 1); 438 } 439 440 /** 441 * Main entry point. 442 * @param args The command line parameters. 443 * @throws Exception When running the job fails. 444 */ 445 public static void main(String[] args) throws Exception { 446 new RowCounter().doStaticMain(args); 447 } 448 449 static class RowCounterCommandLineParser extends BasicParser { 450 451 @Override 452 protected void checkRequiredOptions() throws MissingOptionException { 453 if (this.cmd.getArgList().size() < 1 || this.cmd.getArgList().get(0).startsWith("-")) { 454 throw new MissingOptionException("First argument must be a valid table name."); 455 } 456 } 457 } 458 459 @Override 460 protected CommandLineParser newParser() { 461 return new RowCounterCommandLineParser(); 462 } 463 464 @RestrictedApi(explanation = "Only visible for testing", link = "", 465 allowedOnPath = ".*/src/test/.*") 466 Job getMapReduceJob() { 467 return job; 468 } 469 470}