001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.IntegrationTestBase; 039import org.apache.hadoop.hbase.IntegrationTestingUtility; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionFactory; 045import org.apache.hadoop.hbase.client.Consistency; 046import org.apache.hadoop.hbase.client.RegionLocator; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.testclassification.IntegrationTests; 058import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.RegionSplitter; 062import org.apache.hadoop.io.LongWritable; 063import org.apache.hadoop.io.NullWritable; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.io.WritableComparable; 066import org.apache.hadoop.io.WritableComparator; 067import org.apache.hadoop.io.WritableUtils; 068import org.apache.hadoop.mapreduce.InputFormat; 069import org.apache.hadoop.mapreduce.InputSplit; 070import org.apache.hadoop.mapreduce.Job; 071import org.apache.hadoop.mapreduce.JobContext; 072import org.apache.hadoop.mapreduce.Mapper; 073import org.apache.hadoop.mapreduce.Partitioner; 074import org.apache.hadoop.mapreduce.RecordReader; 075import org.apache.hadoop.mapreduce.Reducer; 076import org.apache.hadoop.mapreduce.TaskAttemptContext; 077import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 078import org.apache.hadoop.util.StringUtils; 079import org.apache.hadoop.util.ToolRunner; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 086import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 087import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 088 089/** 090 * Test Bulk Load and MR on a distributed cluster. It starts an MR job that creates linked chains 091 * The format of rows is like this: Row Key -> Long L:<< Chain Id >> -> Row Key of the next link in 092 * the chain S:<< Chain Id >> -> The step in the chain that his link is. D:<< Chain Id >> -> Random 093 * Data. All chains start on row 0. All rk's are > 0. After creating the linked lists they are 094 * walked over using a TableMapper based Mapreduce Job. There are a few options exposed: 095 * hbase.IntegrationTestBulkLoad.chainLength The number of rows that will be part of each and every 096 * chain. hbase.IntegrationTestBulkLoad.numMaps The number of mappers that will be run. Each mapper 097 * creates on linked list chain. hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be 098 * run to create linked lists. hbase.IntegrationTestBulkLoad.tableName The name of the table. 099 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table 100 * under test. 101 */ 102@Category(IntegrationTests.class) 103public class IntegrationTestBulkLoad extends IntegrationTestBase { 104 105 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class); 106 107 private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); 108 private static final byte[] SORT_FAM = Bytes.toBytes("S"); 109 private static final byte[] DATA_FAM = Bytes.toBytes("D"); 110 111 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; 112 private static int CHAIN_LENGTH = 500000; 113 114 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 115 private static int NUM_MAPS = 1; 116 117 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 118 private static int NUM_IMPORT_ROUNDS = 1; 119 120 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; 121 122 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; 123 private static String TABLE_NAME = "IntegrationTestBulkLoad"; 124 125 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 126 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 127 128 private static final String OPT_LOAD = "load"; 129 private static final String OPT_CHECK = "check"; 130 131 private boolean load = false; 132 private boolean check = false; 133 134 public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver { 135 static final AtomicLong sleepTime = new AtomicLong(2000); 136 AtomicLong countOfNext = new AtomicLong(0); 137 AtomicLong countOfOpen = new AtomicLong(0); 138 139 public SlowMeCoproScanOperations() { 140 } 141 142 @Override 143 public Optional<RegionObserver> getRegionObserver() { 144 return Optional.of(this); 145 } 146 147 @Override 148 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 149 final Scan scan) throws IOException { 150 if (countOfOpen.incrementAndGet() == 2) { // slowdown openScanner randomly 151 slowdownCode(e); 152 } 153 } 154 155 @Override 156 public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 157 final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore) 158 throws IOException { 159 // this will slow down a certain next operation if the conditions are met. The slowness 160 // will allow the call to go to a replica 161 countOfNext.incrementAndGet(); 162 if (countOfNext.get() == 0 || countOfNext.get() == 4) { 163 slowdownCode(e); 164 } 165 return true; 166 } 167 168 protected void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e) { 169 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 170 try { 171 if (sleepTime.get() > 0) { 172 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 173 Thread.sleep(sleepTime.get()); 174 } 175 } catch (InterruptedException e1) { 176 LOG.error(e1.toString(), e1); 177 } 178 } 179 } 180 } 181 182 /** 183 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. 184 */ 185 private void installSlowingCoproc() throws IOException, InterruptedException { 186 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 187 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 188 189 TableName t = getTablename(); 190 Admin admin = util.getAdmin(); 191 TableDescriptor desc = admin.getDescriptor(t); 192 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc); 193 builder.setCoprocessor(SlowMeCoproScanOperations.class.getName()); 194 admin.modifyTable(builder.build()); 195 } 196 197 @Test 198 public void testBulkLoad() throws Exception { 199 runLoad(); 200 installSlowingCoproc(); 201 runCheckWithRetry(); 202 } 203 204 public void runLoad() throws Exception { 205 setupTable(); 206 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 207 LOG.info("Running load with numIterations:" + numImportRounds); 208 for (int i = 0; i < numImportRounds; i++) { 209 runLinkedListMRJob(i); 210 } 211 } 212 213 private byte[][] getSplits(int numRegions) { 214 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit(); 215 split.setFirstRow(Bytes.toBytes(0L)); 216 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE)); 217 return split.split(numRegions); 218 } 219 220 private void setupTable() throws IOException, InterruptedException { 221 if (util.getAdmin().tableExists(getTablename())) { 222 util.deleteTable(getTablename()); 223 } 224 225 util.createTable(getTablename(), new byte[][] { CHAIN_FAM, SORT_FAM, DATA_FAM }, getSplits(16)); 226 227 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 228 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; 229 230 TableName t = getTablename(); 231 HBaseTestingUtil.setReplicas(util.getAdmin(), t, replicaCount); 232 } 233 234 private void runLinkedListMRJob(int iteration) throws Exception { 235 String jobName = 236 IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime(); 237 Configuration conf = new Configuration(util.getConfiguration()); 238 Path p = null; 239 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { 240 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); 241 } else { 242 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY)); 243 } 244 245 conf.setBoolean("mapreduce.map.speculative", false); 246 conf.setBoolean("mapreduce.reduce.speculative", false); 247 conf.setInt(ROUND_NUM_KEY, iteration); 248 249 Job job = new Job(conf); 250 251 job.setJobName(jobName); 252 253 // set the input format so that we can create map tasks with no data input. 254 job.setInputFormatClass(ITBulkLoadInputFormat.class); 255 256 // Set the mapper classes. 257 job.setMapperClass(LinkedListCreationMapper.class); 258 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 259 job.setMapOutputValueClass(KeyValue.class); 260 261 // Use the identity reducer 262 // So nothing to do here. 263 264 // Set this jar. 265 job.setJarByClass(getClass()); 266 267 // Set where to place the hfiles. 268 FileOutputFormat.setOutputPath(job, p); 269 try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); 270 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { 271 // Configure the partitioner and other things needed for HFileOutputFormat. 272 HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()), 273 regionLocator); 274 // Run the job making sure it works. 275 assertEquals(true, job.waitForCompletion(true)); 276 } 277 // Create a new loader. 278 BulkLoadHFiles loader = BulkLoadHFiles.create(conf); 279 // Load the HFiles in. 280 loader.bulkLoad(getTablename(), p); 281 // Delete the files. 282 util.getTestFileSystem().delete(p, true); 283 } 284 285 public static class EmptySplit extends InputSplit implements Writable { 286 @Override 287 public void write(DataOutput out) throws IOException { 288 } 289 290 @Override 291 public void readFields(DataInput in) throws IOException { 292 } 293 294 @Override 295 public long getLength() { 296 return 0L; 297 } 298 299 @Override 300 public String[] getLocations() { 301 return new String[0]; 302 } 303 } 304 305 public static class FixedRecordReader<K, V> extends RecordReader<K, V> { 306 private int index = -1; 307 private K[] keys; 308 private V[] values; 309 310 public FixedRecordReader(K[] keys, V[] values) { 311 this.keys = keys; 312 this.values = values; 313 } 314 315 @Override 316 public void initialize(InputSplit split, TaskAttemptContext context) 317 throws IOException, InterruptedException { 318 } 319 320 @Override 321 public boolean nextKeyValue() throws IOException, InterruptedException { 322 return ++index < keys.length; 323 } 324 325 @Override 326 public K getCurrentKey() throws IOException, InterruptedException { 327 return keys[index]; 328 } 329 330 @Override 331 public V getCurrentValue() throws IOException, InterruptedException { 332 return values[index]; 333 } 334 335 @Override 336 public float getProgress() throws IOException, InterruptedException { 337 return (float) index / keys.length; 338 } 339 340 @Override 341 public void close() throws IOException { 342 } 343 } 344 345 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> { 346 @Override 347 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 348 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 349 ArrayList<InputSplit> ret = new ArrayList<>(numSplits); 350 for (int i = 0; i < numSplits; ++i) { 351 ret.add(new EmptySplit()); 352 } 353 return ret; 354 } 355 356 @Override 357 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, 358 TaskAttemptContext context) throws IOException, InterruptedException { 359 int taskId = context.getTaskAttemptID().getTaskID().getId(); 360 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); 361 int numIterations = 362 context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); 363 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); 364 365 taskId = taskId + iteration * numMapTasks; 366 numMapTasks = numMapTasks * numIterations; 367 368 long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 369 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per 370 // task and across iterations 371 LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) }; 372 373 return new FixedRecordReader<>(keys, keys); 374 } 375 } 376 377 /** 378 * Mapper that creates a linked list of KeyValues. Each map task generates one linked list. All 379 * lists start on row key 0L. All lists should be CHAIN_LENGTH long. 380 */ 381 public static class LinkedListCreationMapper 382 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> { 383 384 @Override 385 protected void map(LongWritable key, LongWritable value, Context context) 386 throws IOException, InterruptedException { 387 long chainId = value.get(); 388 LOG.info("Starting mapper with chainId:" + chainId); 389 390 byte[] chainIdArray = Bytes.toBytes(chainId); 391 long currentRow = 0; 392 393 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 394 long nextRow = getNextRow(0, chainLength); 395 byte[] valueBytes = new byte[50]; 396 397 for (long i = 0; i < chainLength; i++) { 398 byte[] rk = Bytes.toBytes(currentRow); 399 400 // Next link in the chain. 401 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); 402 // What link in the chain this is. 403 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); 404 // Added data so that large stores are created. 405 Bytes.random(valueBytes); 406 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, valueBytes); 407 408 // Emit the key values. 409 context.write(new ImmutableBytesWritable(rk), linkKv); 410 context.write(new ImmutableBytesWritable(rk), sortKv); 411 context.write(new ImmutableBytesWritable(rk), dataKv); 412 // Move to the next row. 413 currentRow = nextRow; 414 nextRow = getNextRow(i + 1, chainLength); 415 } 416 } 417 418 /** Returns a unique row id within this chain for this index */ 419 private long getNextRow(long index, long chainLength) { 420 long nextRow = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); 421 // use significant bits from the random number, but pad with index to ensure it is unique 422 // this also ensures that we do not reuse row = 0 423 // row collisions from multiple mappers are fine, since we guarantee unique chainIds 424 nextRow = nextRow - (nextRow % chainLength) + index; 425 return nextRow; 426 } 427 } 428 429 /** 430 * Writable class used as the key to group links in the linked list. Used as the key emited from a 431 * pass over the table. 432 */ 433 public static class LinkKey implements WritableComparable<LinkKey> { 434 435 private Long chainId; 436 437 public Long getOrder() { 438 return order; 439 } 440 441 public Long getChainId() { 442 return chainId; 443 } 444 445 private Long order; 446 447 public LinkKey() { 448 } 449 450 public LinkKey(long chainId, long order) { 451 this.chainId = chainId; 452 this.order = order; 453 } 454 455 @Override 456 public int compareTo(LinkKey linkKey) { 457 int res = getChainId().compareTo(linkKey.getChainId()); 458 if (res == 0) { 459 res = getOrder().compareTo(linkKey.getOrder()); 460 } 461 return res; 462 } 463 464 @Override 465 public void write(DataOutput dataOutput) throws IOException { 466 WritableUtils.writeVLong(dataOutput, chainId); 467 WritableUtils.writeVLong(dataOutput, order); 468 } 469 470 @Override 471 public void readFields(DataInput dataInput) throws IOException { 472 chainId = WritableUtils.readVLong(dataInput); 473 order = WritableUtils.readVLong(dataInput); 474 } 475 } 476 477 /** 478 * Writable used as the value emitted from a pass over the hbase table. 479 */ 480 public static class LinkChain implements WritableComparable<LinkChain> { 481 482 public Long getNext() { 483 return next; 484 } 485 486 public Long getRk() { 487 return rk; 488 } 489 490 public LinkChain() { 491 } 492 493 public LinkChain(Long rk, Long next) { 494 this.rk = rk; 495 this.next = next; 496 } 497 498 private Long rk; 499 private Long next; 500 501 @Override 502 public int compareTo(LinkChain linkChain) { 503 int res = getRk().compareTo(linkChain.getRk()); 504 if (res == 0) { 505 res = getNext().compareTo(linkChain.getNext()); 506 } 507 return res; 508 } 509 510 @Override 511 public void write(DataOutput dataOutput) throws IOException { 512 WritableUtils.writeVLong(dataOutput, rk); 513 WritableUtils.writeVLong(dataOutput, next); 514 } 515 516 @Override 517 public void readFields(DataInput dataInput) throws IOException { 518 rk = WritableUtils.readVLong(dataInput); 519 next = WritableUtils.readVLong(dataInput); 520 } 521 } 522 523 /** 524 * Class to figure out what partition to send a link in the chain to. This is based upon the 525 * linkKey's ChainId. 526 */ 527 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> { 528 @Override 529 public int getPartition(LinkKey linkKey, LinkChain linkChain, int numPartitions) { 530 int hash = linkKey.getChainId().hashCode(); 531 return Math.abs(hash % numPartitions); 532 } 533 } 534 535 /** 536 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the 537 * linkKey's ChainId. 538 */ 539 public static class NaturalKeyGroupingComparator extends WritableComparator { 540 541 protected NaturalKeyGroupingComparator() { 542 super(LinkKey.class, true); 543 } 544 545 @Override 546 public int compare(WritableComparable w1, WritableComparable w2) { 547 LinkKey k1 = (LinkKey) w1; 548 LinkKey k2 = (LinkKey) w2; 549 550 return k1.getChainId().compareTo(k2.getChainId()); 551 } 552 } 553 554 /** 555 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based 556 * upon linkKey ChainId and Order. 557 */ 558 public static class CompositeKeyComparator extends WritableComparator { 559 560 protected CompositeKeyComparator() { 561 super(LinkKey.class, true); 562 } 563 564 @Override 565 public int compare(WritableComparable w1, WritableComparable w2) { 566 LinkKey k1 = (LinkKey) w1; 567 LinkKey k2 = (LinkKey) w2; 568 569 return k1.compareTo(k2); 570 } 571 } 572 573 /** 574 * Mapper to pass over the table. For every row there could be multiple chains that landed on this 575 * row. So emit a linkKey and value for each. 576 */ 577 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> { 578 @Override 579 protected void map(ImmutableBytesWritable key, Result value, Context context) 580 throws IOException, InterruptedException { 581 long longRk = Bytes.toLong(value.getRow()); 582 583 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) { 584 long chainId = Bytes.toLong(entry.getKey()); 585 long next = Bytes.toLong(entry.getValue()); 586 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0); 587 long order = Bytes.toLong(CellUtil.cloneValue(c)); 588 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next)); 589 } 590 } 591 } 592 593 /** 594 * Class that does the actual checking of the links. All links in the chain should be grouped and 595 * sorted when sent to this class. Then the chain will be traversed making sure that no link is 596 * missing and that the chain is the correct length. This will throw an exception if anything is 597 * not correct. That causes the job to fail if any data is corrupt. 598 */ 599 public static class LinkedListCheckingReducer 600 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> { 601 @Override 602 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context) 603 throws java.io.IOException, java.lang.InterruptedException { 604 long next = -1L; 605 long prev = -1L; 606 long count = 0L; 607 608 for (LinkChain lc : values) { 609 610 if (next == -1) { 611 if (lc.getRk() != 0L) { 612 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + ". Chain:" 613 + key.chainId + ", order:" + key.order; 614 logError(msg, context); 615 throw new RuntimeException(msg); 616 } 617 next = lc.getNext(); 618 } else { 619 if (next != lc.getRk()) { 620 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + next 621 + " but got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order; 622 logError(msg, context); 623 throw new RuntimeException(msg); 624 } 625 prev = lc.getRk(); 626 next = lc.getNext(); 627 } 628 count++; 629 } 630 631 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); 632 if (count != expectedChainLen) { 633 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got " 634 + count + ". Chain:" + key.chainId + ", order:" + key.order; 635 logError(msg, context); 636 throw new RuntimeException(msg); 637 } 638 } 639 640 private static void logError(String msg, Context context) throws IOException { 641 TableName table = getTableName(context.getConfiguration()); 642 643 LOG.error("Failure in chain verification: " + msg); 644 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 645 Admin admin = connection.getAdmin()) { 646 LOG.error("cluster metrics:\n" + admin.getClusterMetrics()); 647 LOG.error("table regions:\n" + Joiner.on("\n").join(admin.getRegions(table))); 648 } 649 } 650 } 651 652 private void runCheckWithRetry() 653 throws IOException, ClassNotFoundException, InterruptedException { 654 try { 655 runCheck(); 656 } catch (Throwable t) { 657 LOG.warn("Received " + StringUtils.stringifyException(t)); 658 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not"); 659 runCheck(); 660 throw t; // we should still fail the test even if second retry succeeds 661 } 662 // everything green 663 } 664 665 /** 666 * After adding data to the table start a mr job to 667 */ 668 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { 669 LOG.info("Running check"); 670 Configuration conf = getConf(); 671 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime(); 672 Path p = util.getDataTestDirOnTestFS(jobName); 673 674 Job job = new Job(conf); 675 job.setJarByClass(getClass()); 676 job.setJobName(jobName); 677 678 job.setPartitionerClass(NaturalKeyPartitioner.class); 679 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 680 job.setSortComparatorClass(CompositeKeyComparator.class); 681 682 Scan scan = new Scan(); 683 scan.addFamily(CHAIN_FAM); 684 scan.addFamily(SORT_FAM); 685 scan.readVersions(1); 686 scan.setCacheBlocks(false); 687 scan.setBatch(1000); 688 689 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 690 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 691 scan.setConsistency(Consistency.TIMELINE); 692 } 693 694 TableMapReduceUtil.initTableMapperJob(getTablename().getName(), scan, 695 LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, job); 696 697 job.setReducerClass(LinkedListCheckingReducer.class); 698 job.setOutputKeyClass(NullWritable.class); 699 job.setOutputValueClass(NullWritable.class); 700 701 FileOutputFormat.setOutputPath(job, p); 702 703 assertEquals(true, job.waitForCompletion(true)); 704 705 // Delete the files. 706 util.getTestFileSystem().delete(p, true); 707 } 708 709 @Override 710 public void setUpCluster() throws Exception { 711 util = getTestingUtil(getConf()); 712 util.initializeCluster(1); 713 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 714 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 715 LOG.debug("Region Replicas enabled: " + replicaCount); 716 } 717 718 // Scale this up on a real cluster 719 if (util.isDistributedCluster()) { 720 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 721 Integer.toString(util.getAdmin().getRegionServers().size() * 10)); 722 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 723 } else { 724 util.startMiniMapReduceCluster(); 725 } 726 } 727 728 @Override 729 protected void addOptions() { 730 super.addOptions(); 731 super.addOptNoArg(OPT_CHECK, "Run check only"); 732 super.addOptNoArg(OPT_LOAD, "Run load only"); 733 } 734 735 @Override 736 protected void processOptions(CommandLine cmd) { 737 super.processOptions(cmd); 738 check = cmd.hasOption(OPT_CHECK); 739 load = cmd.hasOption(OPT_LOAD); 740 } 741 742 @Override 743 public int runTestFromCommandLine() throws Exception { 744 if (load) { 745 runLoad(); 746 } else if (check) { 747 installSlowingCoproc(); 748 runCheckWithRetry(); 749 } else { 750 testBulkLoad(); 751 } 752 return 0; 753 } 754 755 @Override 756 public TableName getTablename() { 757 return getTableName(getConf()); 758 } 759 760 public static TableName getTableName(Configuration conf) { 761 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME)); 762 } 763 764 @Override 765 protected Set<String> getColumnFamilies() { 766 return Sets.newHashSet(Bytes.toString(CHAIN_FAM), Bytes.toString(DATA_FAM), 767 Bytes.toString(SORT_FAM)); 768 } 769 770 public static void main(String[] args) throws Exception { 771 Configuration conf = HBaseConfiguration.create(); 772 IntegrationTestingUtility.setUseDistributedCluster(conf); 773 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); 774 System.exit(status); 775 } 776}