001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.filter.FilterBase; 029import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 030import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.util.AbstractHBaseTool; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.mapreduce.Counter; 035import org.apache.hadoop.mapreduce.Job; 036import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 042import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; 043import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 044import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 045import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 046import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 047import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 048 049/** 050 * A job with a just a map phase to count rows. Map outputs table rows IF the input row has columns 051 * that have content. 052 */ 053@InterfaceAudience.Public 054public class RowCounter extends AbstractHBaseTool { 055 056 private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); 057 058 /** Name of this 'program'. */ 059 static final String NAME = "rowcounter"; 060 061 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 062 private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; 063 064 private final static String OPT_START_TIME = "starttime"; 065 private final static String OPT_END_TIME = "endtime"; 066 private final static String OPT_RANGE = "range"; 067 private final static String OPT_EXPECTED_COUNT = "expectedCount"; 068 069 private String tableName; 070 private List<MultiRowRangeFilter.RowRange> rowRangeList; 071 private long startTime; 072 private long endTime; 073 private long expectedCount; 074 private List<String> columns = new ArrayList<>(); 075 076 /** 077 * Mapper that runs the count. 078 */ 079 static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> { 080 081 /** Counter enumeration to count the actual rows. */ 082 public static enum Counters { 083 ROWS 084 } 085 086 /** 087 * Maps the data. 088 * @param row The current table row key. 089 * @param values The columns. 090 * @param context The current context. 091 * @throws IOException When something is broken with the data. 092 * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) 093 */ 094 @Override 095 public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { 096 // Count every row containing data, whether it's in qualifiers or values 097 context.getCounter(Counters.ROWS).increment(1); 098 } 099 } 100 101 /** 102 * Sets up the actual job. 103 * @param conf The current configuration. 104 * @return The newly created job. 105 * @throws IOException When setting up the job fails. 106 */ 107 public Job createSubmittableJob(Configuration conf) throws IOException { 108 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 109 job.setJarByClass(RowCounter.class); 110 Scan scan = new Scan(); 111 scan.setCacheBlocks(false); 112 setScanFilter(scan, rowRangeList); 113 114 for (String columnName : this.columns) { 115 String family = StringUtils.substringBefore(columnName, ":"); 116 String qualifier = StringUtils.substringAfter(columnName, ":"); 117 if (StringUtils.isBlank(qualifier)) { 118 scan.addFamily(Bytes.toBytes(family)); 119 } else { 120 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 121 } 122 } 123 124 if (this.expectedCount >= 0) { 125 conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount); 126 } 127 128 scan.setTimeRange(startTime, endTime); 129 job.setOutputFormatClass(NullOutputFormat.class); 130 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 131 ImmutableBytesWritable.class, Result.class, job); 132 job.setNumReduceTasks(0); 133 return job; 134 } 135 136 /** 137 * Sets up the actual job. 138 * @param conf The current configuration. 139 * @param args The command line parameters. 140 * @return The newly created job. 141 * @throws IOException When setting up the job fails. 142 * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead. 143 */ 144 @Deprecated 145 public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 146 String tableName = args[0]; 147 List<MultiRowRangeFilter.RowRange> rowRangeList = null; 148 long startTime = 0; 149 long endTime = 0; 150 151 StringBuilder sb = new StringBuilder(); 152 153 final String rangeSwitch = "--range="; 154 final String startTimeArgKey = "--starttime="; 155 final String endTimeArgKey = "--endtime="; 156 final String expectedCountArg = "--expected-count="; 157 158 // First argument is table name, starting from second 159 for (int i = 1; i < args.length; i++) { 160 if (args[i].startsWith(rangeSwitch)) { 161 try { 162 rowRangeList = parseRowRangeParameter( 163 args[i].substring(args[1].indexOf(rangeSwitch) + rangeSwitch.length())); 164 } catch (IllegalArgumentException e) { 165 return null; 166 } 167 continue; 168 } 169 if (args[i].startsWith(startTimeArgKey)) { 170 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); 171 continue; 172 } 173 if (args[i].startsWith(endTimeArgKey)) { 174 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); 175 continue; 176 } 177 if (args[i].startsWith(expectedCountArg)) { 178 conf.setLong(EXPECTED_COUNT_KEY, 179 Long.parseLong(args[i].substring(expectedCountArg.length()))); 180 continue; 181 } 182 // if no switch, assume column names 183 sb.append(args[i]); 184 sb.append(" "); 185 } 186 if (endTime < startTime) { 187 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 188 return null; 189 } 190 191 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 192 job.setJarByClass(RowCounter.class); 193 Scan scan = new Scan(); 194 scan.setCacheBlocks(false); 195 setScanFilter(scan, rowRangeList); 196 if (sb.length() > 0) { 197 for (String columnName : sb.toString().trim().split(" ")) { 198 String family = StringUtils.substringBefore(columnName, ":"); 199 String qualifier = StringUtils.substringAfter(columnName, ":"); 200 201 if (StringUtils.isBlank(qualifier)) { 202 scan.addFamily(Bytes.toBytes(family)); 203 } else { 204 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 205 } 206 } 207 } 208 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 209 job.setOutputFormatClass(NullOutputFormat.class); 210 TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 211 ImmutableBytesWritable.class, Result.class, job); 212 job.setNumReduceTasks(0); 213 return job; 214 } 215 216 /** 217 * Prints usage without error message. Note that we don't document --expected-count, because it's 218 * intended for test. 219 */ 220 private static void printUsage(String errorMessage) { 221 System.err.println("ERROR: " + errorMessage); 222 System.err.println( 223 "Usage: hbase rowcounter [options] <tablename> " + "[--starttime=<start> --endtime=<end>] " 224 + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); 225 System.err.println("For performance consider the following options:\n" 226 + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false"); 227 } 228 229 private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) { 230 final List<String> rangesSplit = Splitter.on(";").splitToList(arg); 231 final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); 232 for (String range : rangesSplit) { 233 if (range != null && !range.isEmpty()) { 234 List<String> startEnd = Splitter.on(",").splitToList(range); 235 if (startEnd.size() != 2 || startEnd.get(1).contains(",")) { 236 throw new IllegalArgumentException("Wrong range specification: " + range); 237 } 238 String startKey = startEnd.get(0); 239 String endKey = startEnd.get(1); 240 rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), true, 241 Bytes.toBytesBinary(endKey), false)); 242 } 243 } 244 return rangeList; 245 } 246 247 /** 248 * Sets filter {@link FilterBase} to the {@link Scan} instance. If provided rowRangeList contains 249 * more than one element, method sets filter which is instance of {@link MultiRowRangeFilter}. 250 * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList 251 * contains exactly one element, startRow and stopRow are set to the scan. 252 */ 253 private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) { 254 final int size = rowRangeList == null ? 0 : rowRangeList.size(); 255 if (size <= 1) { 256 scan.setFilter(new FirstKeyOnlyFilter()); 257 } 258 if (size == 1) { 259 MultiRowRangeFilter.RowRange range = rowRangeList.get(0); 260 scan.setStartRow(range.getStartRow()); // inclusive 261 scan.setStopRow(range.getStopRow()); // exclusive 262 } else if (size > 1) { 263 scan.setFilter(new MultiRowRangeFilter(rowRangeList)); 264 } 265 } 266 267 @Override 268 protected void printUsage() { 269 StringBuilder footerBuilder = new StringBuilder(); 270 footerBuilder.append("For performance, consider the following configuration properties:\n"); 271 footerBuilder.append("-Dhbase.client.scanner.caching=100\n"); 272 footerBuilder.append("-Dmapreduce.map.speculative=false\n"); 273 printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", "Options:", 274 footerBuilder.toString()); 275 } 276 277 @Override 278 protected void printUsage(final String usageStr, final String usageHeader, 279 final String usageFooter) { 280 HelpFormatter helpFormatter = new HelpFormatter(); 281 helpFormatter.setWidth(120); 282 helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator()); 283 helpFormatter.setLongOptSeparator("="); 284 helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); 285 } 286 287 @Override 288 protected void addOptions() { 289 Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 290 .desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build(); 291 Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true) 292 .desc("end time filter limit, to only count rows up to this timestamp.").longOpt(OPT_END_TIME) 293 .build(); 294 Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true) 295 .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); 296 Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true) 297 .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); 298 addOption(startTimeOption); 299 addOption(endTimeOption); 300 addOption(rangeOption); 301 addOption(expectedOption); 302 } 303 304 @Override 305 protected void processOptions(CommandLine cmd) throws IllegalArgumentException { 306 this.tableName = cmd.getArgList().get(0); 307 if (cmd.getOptionValue(OPT_RANGE) != null) { 308 this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE)); 309 } 310 this.endTime = cmd.getOptionValue(OPT_END_TIME) == null 311 ? HConstants.LATEST_TIMESTAMP 312 : Long.parseLong(cmd.getOptionValue(OPT_END_TIME)); 313 this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null 314 ? Long.MIN_VALUE 315 : Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT)); 316 this.startTime = cmd.getOptionValue(OPT_START_TIME) == null 317 ? 0 318 : Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); 319 320 for (int i = 1; i < cmd.getArgList().size(); i++) { 321 String argument = cmd.getArgList().get(i); 322 if (!argument.startsWith("-")) { 323 this.columns.add(argument); 324 } 325 } 326 327 if (endTime < startTime) { 328 throw new IllegalArgumentException( 329 "--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 330 } 331 } 332 333 @Override 334 protected void processOldArgs(List<String> args) { 335 List<String> copiedArgs = new ArrayList<>(args); 336 args.removeAll(copiedArgs); 337 for (String arg : copiedArgs) { 338 if (arg.startsWith("-") && arg.contains("=")) { 339 String[] kv = arg.split("="); 340 args.add(kv[0]); 341 args.add(kv[1]); 342 } else { 343 args.add(arg); 344 } 345 } 346 } 347 348 @Override 349 protected int doWork() throws Exception { 350 Job job = createSubmittableJob(getConf()); 351 if (job == null) { 352 return -1; 353 } 354 boolean success = job.waitForCompletion(true); 355 final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); 356 if (success && expectedCount != -1) { 357 final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); 358 success = expectedCount == counter.getValue(); 359 if (!success) { 360 LOG.error("Failing job because count of '" + counter.getValue() 361 + "' does not match expected count of '" + expectedCount + "'"); 362 } 363 } 364 return (success ? 0 : 1); 365 } 366 367 /** 368 * Main entry point. 369 * @param args The command line parameters. 370 * @throws Exception When running the job fails. 371 */ 372 public static void main(String[] args) throws Exception { 373 new RowCounter().doStaticMain(args); 374 } 375 376 static class RowCounterCommandLineParser extends BasicParser { 377 378 @Override 379 protected void checkRequiredOptions() throws MissingOptionException { 380 if (this.cmd.getArgList().size() < 1 || this.cmd.getArgList().get(0).startsWith("-")) { 381 throw new MissingOptionException("First argument must be a valid table name."); 382 } 383 } 384 } 385 386 @Override 387 protected CommandLineParser newParser() { 388 return new RowCounterCommandLineParser(); 389 } 390 391}