001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.apache.hadoop.hbase.IntegrationTestingUtility.DEFAULT_REGIONS_PER_SERVER; 021import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE; 022import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE_KEY; 023import static org.apache.hadoop.hbase.IntegrationTestingUtility.REGIONS_PER_SERVER_KEY; 024 025import java.io.DataInput; 026import java.io.DataOutput; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Set; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.LocatedFileStatus; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.fs.RemoteIterator; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.IntegrationTestBase; 051import org.apache.hadoop.hbase.IntegrationTestingUtility; 052import org.apache.hadoop.hbase.MasterNotRunningException; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.BufferedMutator; 056import org.apache.hadoop.hbase.client.BufferedMutatorParams; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 058import org.apache.hadoop.hbase.client.Connection; 059import org.apache.hadoop.hbase.client.ConnectionConfiguration; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.Get; 062import org.apache.hadoop.hbase.client.Mutation; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.RegionLocator; 065import org.apache.hadoop.hbase.client.Result; 066import org.apache.hadoop.hbase.client.ResultScanner; 067import org.apache.hadoop.hbase.client.Scan; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.fs.HFileSystem; 072import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 073import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 074import org.apache.hadoop.hbase.mapreduce.TableMapper; 075import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 076import org.apache.hadoop.hbase.mapreduce.WALPlayer; 077import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 078import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 079import org.apache.hadoop.hbase.testclassification.IntegrationTests; 080import org.apache.hadoop.hbase.util.AbstractHBaseTool; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 084import org.apache.hadoop.hbase.util.Random64; 085import org.apache.hadoop.hbase.util.RegionSplitter; 086import org.apache.hadoop.hbase.wal.WALEdit; 087import org.apache.hadoop.hbase.wal.WALKey; 088import org.apache.hadoop.io.BytesWritable; 089import org.apache.hadoop.io.NullWritable; 090import org.apache.hadoop.io.Writable; 091import org.apache.hadoop.mapreduce.Counter; 092import org.apache.hadoop.mapreduce.CounterGroup; 093import org.apache.hadoop.mapreduce.Counters; 094import org.apache.hadoop.mapreduce.InputFormat; 095import org.apache.hadoop.mapreduce.InputSplit; 096import org.apache.hadoop.mapreduce.Job; 097import org.apache.hadoop.mapreduce.JobContext; 098import org.apache.hadoop.mapreduce.Mapper; 099import org.apache.hadoop.mapreduce.RecordReader; 100import org.apache.hadoop.mapreduce.Reducer; 101import org.apache.hadoop.mapreduce.TaskAttemptContext; 102import org.apache.hadoop.mapreduce.TaskAttemptID; 103import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 104import org.apache.hadoop.mapreduce.lib.input.FileSplit; 105import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 106import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 107import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 108import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 109import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 110import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 111import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 112import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 113import org.apache.hadoop.util.Tool; 114import org.apache.hadoop.util.ToolRunner; 115import org.junit.Test; 116import org.junit.experimental.categories.Category; 117import org.slf4j.Logger; 118import org.slf4j.LoggerFactory; 119 120import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 121import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 122import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 123import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 124import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 125import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 126import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 127import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 128 129/** 130 * <p> 131 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn 132 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found 133 * here: 134 * <ul> 135 * <li>https://github.com/keith-turner/goraci</li> 136 * <li>https://github.com/enis/goraci/</li> 137 * </ul> 138 * </p> 139 * <p> 140 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This 141 * test suite is called continuous ingest. This test runs many ingest clients that continually 142 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map 143 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost. 144 * </p> 145 * <p> 146 * The nodes in the linked list are random. This causes each linked list to spread across the table. 147 * Therefore if one part of a table loses data, then it will be detected by references in another 148 * part of the table. 149 * </p> 150 * <p> 151 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific 152 * details look at the Generator code. 153 * </p> 154 * <p> 155 * <ol> 156 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li> 157 * <li>Flush the client</li> 158 * <li>Write out 1 million that reference previous million</li> 159 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last 160 * (25 is configurable; its the 'wrap multiplier' referred to below)</li> 161 * <li>goto 1</li> 162 * </ol> 163 * </p> 164 * <p> 165 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a 166 * missing node, even if the ingest client is killed at any point in time. 167 * </p> 168 * <p> 169 * When running this test suite w/ Accumulo there is a script running in parallel called the 170 * Aggitator that randomly and continuously kills server processes. The outcome was that many data 171 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that 172 * impact uptime and stability when run for days or weeks. 173 * </p> 174 * <p> 175 * This test suite consists the following 176 * <ul> 177 * <li>a few Java programs</li> 178 * <li>a little helper script to run the java programs</li> 179 * <li>a maven script to build it</li> 180 * </ul> 181 * </p> 182 * <p> 183 * When generating data, its best to have each map task generate a multiple of 25 million. The 184 * reason for this is that circular linked list are generated every 25M. Not generating a multiple 185 * in 25M will result in some nodes in the linked list not having references. The loss of an 186 * unreferenced node can not be detected. 187 * </p> 188 * <p> 189 * <h3>Below is a description of the Java programs</h3> 190 * <ul> 191 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to 192 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to 193 * select and walk random flushed loops during this phase.</li> 194 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running. 195 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not 196 * run at the same time as the Generator.</li> 197 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing 198 * info.</li> 199 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li> 200 * <li>{@code Delete} - A standalone program that deletes a single node</li> 201 * </ul> 202 * This class can be run as a unit test, as an integration test, or from the command line 203 * </p> 204 * <p> 205 * ex: 206 * 207 * <pre> 208 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 209 * loop 2 1 100000 /temp 1 1000 50 1 0 210 * </pre> 211 * </p> 212 */ 213@Category(IntegrationTests.class) 214public class IntegrationTestBigLinkedList extends IntegrationTestBase { 215 protected static final byte[] NO_KEY = new byte[1]; 216 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 217 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 218 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 219 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 220 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 221 222 // link to the id of the prev node in the linked list 223 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 224 225 // identifier of the mapred task that generated this row 226 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 227 228 // the id of the row within the same client. 229 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 230 231 /** How many rows to write per map task. This has to be a multiple of 25M */ 232 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY = 233 "IntegrationTestBigLinkedList.generator.num_rows"; 234 235 private static final String GENERATOR_NUM_MAPPERS_KEY = 236 "IntegrationTestBigLinkedList.generator.map.tasks"; 237 238 private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width"; 239 240 private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap"; 241 242 private static final String CONCURRENT_WALKER_KEY = 243 "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 244 245 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 246 247 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 248 249 private static final int WIDTH_DEFAULT = 1000000; 250 251 /** 252 * The 'wrap multipler' default. 253 */ 254 private static final int WRAP_DEFAULT = 25; 255 private static final int ROWKEY_LENGTH = 16; 256 257 private static final int CONCURRENT_WALKER_DEFAULT = 0; 258 259 protected String toRun; 260 protected String[] otherArgs; 261 262 static class CINode { 263 byte[] key; 264 byte[] prev; 265 String client; 266 long count; 267 } 268 269 /** 270 * A Map only job that generates random linked list and stores them. 271 */ 272 static class Generator extends Configured implements Tool { 273 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 274 275 /** 276 * Set this configuration if you want to test single-column family flush works. If set, we will 277 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 278 * column family. When we write out the ITBLL, we will also add to the big column family a value 279 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 280 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 281 * way. Here is how you would pass it: 282 * <p> 283 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 284 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 285 */ 286 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 287 "generator.multiple.columnfamilies"; 288 289 /** 290 * Set this configuration if you want to scale up the size of test data quickly. 291 * <p> 292 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 293 * -Dgenerator.big.family.value.size=1024 generator 1 10 output 294 */ 295 public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; 296 297 public static enum GeneratorCounts { 298 SUCCESS, 299 TERMINATING, 300 UNDEFINED, 301 IOEXCEPTION 302 } 303 304 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() 305 + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" 306 + " <num walker threads>] \n" 307 + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n" 308 + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n" 309 + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n" 310 + "first written nodes back to the 25th set.\n" 311 + "Walkers verify random flushed loops during Generation."; 312 313 public Job job; 314 315 static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> { 316 static class GeneratorInputSplit extends InputSplit implements Writable { 317 @Override 318 public long getLength() throws IOException, InterruptedException { 319 return 1; 320 } 321 322 @Override 323 public String[] getLocations() throws IOException, InterruptedException { 324 return new String[0]; 325 } 326 327 @Override 328 public void readFields(DataInput arg0) throws IOException { 329 } 330 331 @Override 332 public void write(DataOutput arg0) throws IOException { 333 } 334 } 335 336 static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> { 337 private long count; 338 private long numNodes; 339 // Use Random64 to avoid issue described in HBASE-21256. 340 private Random64 rand = new Random64(); 341 342 @Override 343 public void close() throws IOException { 344 } 345 346 @Override 347 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 348 byte[] bytes = new byte[ROWKEY_LENGTH]; 349 rand.nextBytes(bytes); 350 return new BytesWritable(bytes); 351 } 352 353 @Override 354 public NullWritable getCurrentValue() throws IOException, InterruptedException { 355 return NullWritable.get(); 356 } 357 358 @Override 359 public float getProgress() throws IOException, InterruptedException { 360 return (float) (count / (double) numNodes); 361 } 362 363 @Override 364 public void initialize(InputSplit arg0, TaskAttemptContext context) 365 throws IOException, InterruptedException { 366 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 367 } 368 369 @Override 370 public boolean nextKeyValue() throws IOException, InterruptedException { 371 return count++ < numNodes; 372 } 373 } 374 375 @Override 376 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 377 TaskAttemptContext context) throws IOException, InterruptedException { 378 GeneratorRecordReader rr = new GeneratorRecordReader(); 379 rr.initialize(split, context); 380 return rr; 381 } 382 383 @Override 384 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 385 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 386 387 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 388 389 for (int i = 0; i < numMappers; i++) { 390 splits.add(new GeneratorInputSplit()); 391 } 392 393 return splits; 394 } 395 } 396 397 /** Ensure output files from prev-job go to map inputs for current job */ 398 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 399 @Override 400 protected boolean isSplitable(JobContext context, Path filename) { 401 return false; 402 } 403 } 404 405 /** 406 * Some ASCII art time: 407 * <p> 408 * [ . . . ] represents one batch of random longs of length WIDTH 409 * 410 * <pre> 411 * _________________________ 412 * | ______ | 413 * | | || 414 * .-+-----------------+-----.|| 415 * | | | ||| 416 * first = [ . . . . . . . . . . . ] ||| 417 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 418 * | | | | | | | | | | | ||| 419 * prev = [ . . . . . . . . . . . ] ||| 420 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 421 * | | | | | | | | | | | ||| 422 * current = [ . . . . . . . . . . . ] ||| 423 * ||| 424 * ... ||| 425 * ||| 426 * last = [ . . . . . . . . . . . ] ||| 427 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 428 * | |________|| 429 * |___________________________| 430 * </pre> 431 */ 432 433 static class GeneratorMapper 434 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 435 436 byte[][] first = null; 437 byte[][] prev = null; 438 byte[][] current = null; 439 byte[] id; 440 long count = 0; 441 int i; 442 BufferedMutator mutator; 443 Connection connection; 444 long numNodes; 445 long wrap; 446 int width; 447 boolean multipleUnevenColumnFamilies; 448 byte[] tinyValue = new byte[] { 't' }; 449 byte[] bigValue = null; 450 Configuration conf; 451 // Use Random64 to avoid issue described in HBASE-21256. 452 private Random64 rand = new Random64(); 453 454 volatile boolean walkersStop; 455 int numWalkers; 456 final Object flushedLoopsLock = new Object(); 457 volatile List<Long> flushedLoops = new ArrayList<>(); 458 List<Thread> walkers = new ArrayList<>(); 459 460 @Override 461 protected void setup(Context context) throws IOException, InterruptedException { 462 id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID()); 463 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 464 instantiateHTable(); 465 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 466 current = new byte[this.width][]; 467 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 468 this.wrap = (long) wrapMultiplier * width; 469 this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 470 (long) WIDTH_DEFAULT * WRAP_DEFAULT); 471 if (this.numNodes < this.wrap) { 472 this.wrap = this.numNodes; 473 } 474 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 475 this.numWalkers = 476 context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 477 this.walkersStop = false; 478 this.conf = context.getConfiguration(); 479 480 if (multipleUnevenColumnFamilies) { 481 int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256); 482 int limit = 483 context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 484 ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT); 485 486 Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n, 487 ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit); 488 489 bigValue = new byte[n]; 490 rand.nextBytes(bigValue); 491 LOG.info("Create a bigValue with " + n + " bytes."); 492 } 493 494 Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes); 495 Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0", 496 numNodes, width); 497 Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0", 498 numNodes, wrap); 499 } 500 501 protected void instantiateHTable() throws IOException { 502 mutator = connection 503 .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration())) 504 .writeBufferSize(4 * 1024 * 1024)); 505 } 506 507 @Override 508 protected void cleanup(Context context) throws IOException, InterruptedException { 509 joinWalkers(); 510 mutator.close(); 511 connection.close(); 512 } 513 514 @Override 515 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 516 current[i] = new byte[key.getLength()]; 517 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 518 if (++i == current.length) { 519 LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length, 520 count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i); 521 persist(output, count, prev, current, id); 522 i = 0; 523 524 if (first == null) { 525 first = current; 526 } 527 prev = current; 528 current = new byte[this.width][]; 529 530 count += current.length; 531 output.setStatus("Count " + count); 532 533 if (count % wrap == 0) { 534 // this block of code turns the 1 million linked list of length 25 into one giant 535 // circular linked list of 25 million 536 circularLeftShift(first); 537 persist(output, -1, prev, first, null); 538 // At this point the entire loop has been flushed so we can add one of its nodes to the 539 // concurrent walker 540 if (numWalkers > 0) { 541 addFlushed(key.getBytes()); 542 if (walkers.isEmpty()) { 543 startWalkers(numWalkers, conf, output); 544 } 545 } 546 first = null; 547 prev = null; 548 } 549 } 550 } 551 552 private static <T> void circularLeftShift(T[] first) { 553 T ez = first[0]; 554 System.arraycopy(first, 1, first, 0, first.length - 1); 555 first[first.length - 1] = ez; 556 } 557 558 private void addFlushed(byte[] rowKey) { 559 synchronized (flushedLoopsLock) { 560 flushedLoops.add(Bytes.toLong(rowKey)); 561 flushedLoopsLock.notifyAll(); 562 } 563 } 564 565 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 566 throws IOException { 567 for (int i = 0; i < current.length; i++) { 568 569 if (i % 100 == 0) { 570 // Tickle progress every so often else maprunner will think us hung 571 output.progress(); 572 } 573 574 Put put = new Put(current[i]); 575 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 576 577 if (count >= 0) { 578 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 579 } 580 if (id != null) { 581 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 582 } 583 // See if we are to write multiple columns. 584 if (this.multipleUnevenColumnFamilies) { 585 // Use any column name. 586 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 587 // Use any column name. 588 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 589 } 590 mutator.mutate(put); 591 } 592 593 mutator.flush(); 594 } 595 596 private void startWalkers(int numWalkers, Configuration conf, Context context) { 597 LOG.info("Starting " + numWalkers + " concurrent walkers"); 598 for (int i = 0; i < numWalkers; i++) { 599 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 600 walker.start(); 601 walkers.add(walker); 602 } 603 } 604 605 private void joinWalkers() { 606 synchronized (flushedLoopsLock) { 607 walkersStop = true; 608 flushedLoopsLock.notifyAll(); 609 } 610 for (Thread walker : walkers) { 611 Uninterruptibles.joinUninterruptibly(walker); 612 } 613 } 614 615 /** 616 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 617 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 618 * configured to only log erroneous nodes. 619 */ 620 621 public class ContinuousConcurrentWalker implements Runnable { 622 623 ConcurrentWalker walker; 624 Configuration conf; 625 Context context; 626 627 public ContinuousConcurrentWalker(Configuration conf, Context context) { 628 this.conf = conf; 629 this.context = context; 630 } 631 632 @Override 633 public void run() { 634 while (!walkersStop) { 635 try { 636 long node = selectLoop(); 637 try { 638 walkLoop(node); 639 } catch (IOException e) { 640 context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l); 641 return; 642 } 643 } catch (InterruptedException e) { 644 return; 645 } 646 } 647 } 648 649 private void walkLoop(long node) throws IOException { 650 walker = new ConcurrentWalker(context); 651 walker.setConf(conf); 652 walker.run(node, wrap); 653 } 654 655 private long selectLoop() throws InterruptedException { 656 synchronized (flushedLoopsLock) { 657 while (flushedLoops.isEmpty() && !walkersStop) { 658 flushedLoopsLock.wait(); 659 } 660 if (walkersStop) { 661 throw new InterruptedException(); 662 } 663 return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size())); 664 } 665 } 666 } 667 668 public static class ConcurrentWalker extends WalkerBase { 669 670 Context context; 671 672 public ConcurrentWalker(Context context) { 673 this.context = context; 674 } 675 676 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 677 678 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 679 byte[] startKey = Bytes.toBytes(startKeyIn); 680 681 Connection connection = ConnectionFactory.createConnection(getConf()); 682 Table table = connection.getTable(getTableName(getConf())); 683 long numQueries = 0; 684 // If isSpecificStart is set, only walk one list from that particular node. 685 // Note that in case of circular (or P-shaped) list it will walk forever, as is 686 // the case in normal run without startKey. 687 688 CINode node = findStartNode(table, startKey); 689 if (node == null) { 690 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 691 throw new IOException("Start node not found: " + startKeyIn); 692 } 693 while (numQueries < maxQueries) { 694 numQueries++; 695 byte[] prev = node.prev; 696 node = getNode(prev, table, node); 697 if (node == null) { 698 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 699 context.getCounter(GeneratorCounts.UNDEFINED).increment(1l); 700 } else if (node.prev.length == NO_KEY.length) { 701 LOG.error( 702 "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key)); 703 context.getCounter(GeneratorCounts.TERMINATING).increment(1l); 704 } else { 705 // Increment for successful walk 706 context.getCounter(GeneratorCounts.SUCCESS).increment(1l); 707 } 708 } 709 table.close(); 710 connection.close(); 711 } 712 } 713 } 714 715 @Override 716 public int run(String[] args) throws Exception { 717 if (args.length < 3) { 718 System.err.println(USAGE); 719 return 1; 720 } 721 try { 722 int numMappers = Integer.parseInt(args[0]); 723 long numNodes = Long.parseLong(args[1]); 724 Path tmpOutput = new Path(args[2]); 725 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 726 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 727 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 728 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 729 } catch (NumberFormatException e) { 730 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 731 System.err.println(USAGE); 732 return 1; 733 } 734 } 735 736 protected void createSchema() throws IOException { 737 Configuration conf = getConf(); 738 TableName tableName = getTableName(conf); 739 try (Connection conn = ConnectionFactory.createConnection(conf); 740 Admin admin = conn.getAdmin()) { 741 if (!admin.tableExists(tableName)) { 742 TableDescriptor tableDescriptor = TableDescriptorBuilder 743 .newBuilder(getTableName(getConf())) 744 // if -DuseMob=true force all data through mob path. 745 .setColumnFamily( 746 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)).build()) 747 // Always add these families. Just skip writing to them when we do not test per CF 748 // flush. 749 .setColumnFamily( 750 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(BIG_FAMILY_NAME)) 751 .build()) 752 .setColumnFamily( 753 setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(TINY_FAMILY_NAME)) 754 .build()) 755 .build(); 756 757 // If we want to pre-split compute how many splits. 758 if (conf.getBoolean(PRESPLIT_TEST_TABLE_KEY, PRESPLIT_TEST_TABLE)) { 759 int numberOfServers = admin.getRegionServers().size(); 760 if (numberOfServers == 0) { 761 throw new IllegalStateException("No live regionservers"); 762 } 763 int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER); 764 int totalNumberOfRegions = numberOfServers * regionsPerServer; 765 LOG.info("Number of live regionservers: " + numberOfServers + ", " 766 + "pre-splitting table into " + totalNumberOfRegions + " regions " 767 + "(default regions per server: " + regionsPerServer + ")"); 768 769 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 770 771 admin.createTable(tableDescriptor, splits); 772 } else { 773 // Looks like we're just letting things play out. 774 // Create a table with on region by default. 775 // This will make the splitting work hard. 776 admin.createTable(tableDescriptor); 777 } 778 } 779 } catch (MasterNotRunningException e) { 780 LOG.error("Master not running", e); 781 throw new IOException(e); 782 } 783 } 784 785 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 786 Integer wrapMultiplier, Integer numWalkers) throws Exception { 787 LOG.info( 788 "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); 789 Job job = Job.getInstance(getConf()); 790 791 job.setJobName("Random Input Generator"); 792 job.setNumReduceTasks(0); 793 job.setJarByClass(getClass()); 794 795 job.setInputFormatClass(GeneratorInputFormat.class); 796 job.setOutputKeyClass(BytesWritable.class); 797 job.setOutputValueClass(NullWritable.class); 798 799 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 800 801 job.setMapperClass(Mapper.class); // identity mapper 802 803 FileOutputFormat.setOutputPath(job, tmpOutput); 804 job.setOutputFormatClass(SequenceFileOutputFormat.class); 805 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class); 806 807 boolean success = jobCompletion(job); 808 809 return success ? 0 : 1; 810 } 811 812 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 813 Integer wrapMultiplier, Integer numWalkers) throws Exception { 814 LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes); 815 createSchema(); 816 job = Job.getInstance(getConf()); 817 818 job.setJobName("Link Generator"); 819 job.setNumReduceTasks(0); 820 job.setJarByClass(getClass()); 821 822 FileInputFormat.setInputPaths(job, tmpOutput); 823 job.setInputFormatClass(OneFilePerMapperSFIF.class); 824 job.setOutputKeyClass(NullWritable.class); 825 job.setOutputValueClass(NullWritable.class); 826 827 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 828 829 setMapperForGenerator(job); 830 831 job.setOutputFormatClass(NullOutputFormat.class); 832 833 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 834 TableMapReduceUtil.addDependencyJars(job); 835 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 836 AbstractHBaseTool.class); 837 TableMapReduceUtil.initCredentials(job); 838 839 boolean success = jobCompletion(job); 840 841 return success ? 0 : 1; 842 } 843 844 protected boolean jobCompletion(Job job) 845 throws IOException, InterruptedException, ClassNotFoundException { 846 boolean success = job.waitForCompletion(true); 847 return success; 848 } 849 850 protected void setMapperForGenerator(Job job) { 851 job.setMapperClass(GeneratorMapper.class); 852 } 853 854 public int run(int numMappers, long numNodes, Path tmpOutput, Integer width, 855 Integer wrapMultiplier, Integer numWalkers) throws Exception { 856 int ret = 857 runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 858 if (ret > 0) { 859 return ret; 860 } 861 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 862 } 863 864 public boolean verify() { 865 try { 866 Counters counters = job.getCounters(); 867 if (counters == null) { 868 LOG.info("Counters object was null, Generator verification cannot be performed." 869 + " This is commonly a result of insufficient YARN configuration."); 870 return false; 871 } 872 873 if ( 874 counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0 875 || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0 876 || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0 877 ) { 878 LOG.error("Concurrent walker failed to verify during Generation phase"); 879 LOG.error( 880 "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue()); 881 LOG.error( 882 "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue()); 883 LOG.error( 884 "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue()); 885 return false; 886 } 887 } catch (IOException e) { 888 LOG.info("Generator verification could not find counter"); 889 return false; 890 } 891 return true; 892 } 893 } 894 895 private static ColumnFamilyDescriptorBuilder setMobProperties(final Configuration conf, 896 final ColumnFamilyDescriptorBuilder builder) { 897 if (conf.getBoolean("useMob", false)) { 898 builder.setMobEnabled(true); 899 builder.setMobThreshold(4); 900 } 901 return builder; 902 } 903 904 /** 905 * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key 906 * file must have been written by Verify step (we depend on the format it writes out. We'll read 907 * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO). 908 */ 909 static class Search extends Configured implements Tool { 910 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 911 protected Job job; 912 913 private static void printUsage(final String error) { 914 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 915 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 916 } 917 918 @Override 919 public int run(String[] args) throws Exception { 920 if (args.length < 1 || args.length > 2) { 921 printUsage(null); 922 return 1; 923 } 924 Path inputDir = new Path(args[0]); 925 int numMappers = 1; 926 if (args.length > 1) { 927 numMappers = Integer.parseInt(args[1]); 928 } 929 return run(inputDir, numMappers); 930 } 931 932 /** 933 * WALPlayer override that searches for keys loaded in the setup. 934 */ 935 public static class WALSearcher extends WALPlayer { 936 public WALSearcher(Configuration conf) { 937 super(conf); 938 } 939 940 /** 941 * The actual searcher mapper. 942 */ 943 public static class WALMapperSearcher extends WALMapper { 944 private SortedSet<byte[]> keysToFind; 945 private AtomicInteger rows = new AtomicInteger(0); 946 947 @Override 948 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 949 throws IOException { 950 super.setup(context); 951 try { 952 this.keysToFind = readKeysToSearch(context.getConfiguration()); 953 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 954 } catch (InterruptedException e) { 955 throw new InterruptedIOException(e.toString()); 956 } 957 } 958 959 @Override 960 protected boolean filter(Context context, Cell cell) { 961 // TODO: Can I do a better compare than this copying out key? 962 byte[] row = new byte[cell.getRowLength()]; 963 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 964 boolean b = this.keysToFind.contains(row); 965 if (b) { 966 String keyStr = Bytes.toStringBinary(row); 967 try { 968 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 969 } catch (IOException | InterruptedException e) { 970 LOG.warn(e.toString(), e); 971 } 972 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 973 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 974 } 975 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 976 } 977 return b; 978 } 979 } 980 981 // Put in place the above WALMapperSearcher. 982 @Override 983 public Job createSubmittableJob(String[] args) throws IOException { 984 Job job = super.createSubmittableJob(args); 985 // Call my class instead. 986 job.setJarByClass(WALMapperSearcher.class); 987 job.setMapperClass(WALMapperSearcher.class); 988 job.setOutputFormatClass(NullOutputFormat.class); 989 return job; 990 } 991 } 992 993 static final String FOUND_GROUP_KEY = "Found"; 994 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 995 996 public int run(Path inputDir, int numMappers) throws Exception { 997 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 998 SortedSet<byte[]> keys = readKeysToSearch(getConf()); 999 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 1000 LOG.info("Count of keys to find: " + keys.size()); 1001 for (byte[] key : keys) 1002 LOG.info("Key: " + Bytes.toStringBinary(key)); 1003 // Now read all WALs. In two dirs. Presumes certain layout. 1004 Path walsDir = 1005 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 1006 Path oldWalsDir = 1007 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 1008 LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers 1009 + " against " + getConf().get(HConstants.HBASE_DIR)); 1010 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 1011 new String[] { walsDir.toString(), "" }); 1012 if (ret != 0) { 1013 return ret; 1014 } 1015 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 1016 new String[] { oldWalsDir.toString(), "" }); 1017 } 1018 1019 static SortedSet<byte[]> readKeysToSearch(final Configuration conf) 1020 throws IOException, InterruptedException { 1021 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 1022 FileSystem fs = FileSystem.get(conf); 1023 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1024 if (!fs.exists(keysInputDir)) { 1025 throw new FileNotFoundException(keysInputDir.toString()); 1026 } 1027 if (!fs.isDirectory(keysInputDir)) { 1028 throw new UnsupportedOperationException("TODO"); 1029 } else { 1030 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 1031 while (iterator.hasNext()) { 1032 LocatedFileStatus keyFileStatus = iterator.next(); 1033 // Skip "_SUCCESS" file. 1034 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 1035 result.addAll(readFileToSearch(conf, keyFileStatus)); 1036 } 1037 } 1038 return result; 1039 } 1040 1041 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 1042 final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { 1043 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1044 // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This 1045 // is 1046 // what is missing. 1047 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 1048 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 1049 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 1050 InputSplit is = 1051 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {}); 1052 rr.initialize(is, context); 1053 while (rr.nextKeyValue()) { 1054 rr.getCurrentKey(); 1055 BytesWritable bw = rr.getCurrentValue(); 1056 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) { 1057 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1058 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, 1059 rr.getCurrentKey().getLength()); 1060 result.add(key); 1061 } 1062 } 1063 } 1064 return result; 1065 } 1066 } 1067 1068 /** 1069 * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have 1070 * any holes. 1071 */ 1072 static class Verify extends Configured implements Tool { 1073 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1074 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1075 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1076 protected Job job; 1077 1078 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1079 private BytesWritable row = new BytesWritable(); 1080 private BytesWritable ref = new BytesWritable(); 1081 private boolean multipleUnevenColumnFamilies; 1082 1083 @Override 1084 protected void 1085 setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1086 throws IOException, InterruptedException { 1087 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1088 } 1089 1090 @Override 1091 protected void map(ImmutableBytesWritable key, Result value, Context context) 1092 throws IOException, InterruptedException { 1093 byte[] rowKey = key.get(); 1094 row.set(rowKey, 0, rowKey.length); 1095 if ( 1096 multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) 1097 || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME)) 1098 ) { 1099 context.write(row, DEF_LOST_FAMILIES); 1100 } else { 1101 context.write(row, DEF); 1102 } 1103 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1104 if (prev != null && prev.length > 0) { 1105 ref.set(prev, 0, prev.length); 1106 context.write(ref, row); 1107 } else { 1108 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1109 } 1110 } 1111 } 1112 1113 /** 1114 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1115 * problems found from the reducer. 1116 */ 1117 public static enum VerifyCounts { 1118 UNREFERENCED, 1119 UNDEFINED, 1120 REFERENCED, 1121 CORRUPT, 1122 EXTRAREFERENCES, 1123 EXTRA_UNDEF_REFERENCES, 1124 LOST_FAMILIES 1125 } 1126 1127 /** 1128 * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent 1129 * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what 1130 * sort of emission it is. Flag is the Count enum ordinal as a short. 1131 */ 1132 public static class VerifyReducer 1133 extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1134 private ArrayList<byte[]> refs = new ArrayList<>(); 1135 private final BytesWritable UNREF = 1136 new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {})); 1137 private final BytesWritable LOSTFAM = 1138 new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {})); 1139 1140 private AtomicInteger rows = new AtomicInteger(0); 1141 private Connection connection; 1142 1143 @Override 1144 protected void 1145 setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1146 throws IOException, InterruptedException { 1147 super.setup(context); 1148 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1149 } 1150 1151 @Override 1152 protected void 1153 cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1154 throws IOException, InterruptedException { 1155 if (this.connection != null) { 1156 this.connection.close(); 1157 } 1158 super.cleanup(context); 1159 } 1160 1161 /** 1162 * Returns new byte array that has <code>ordinal</code> as prefix on front taking up 1163 * Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1164 */ 1165 public static byte[] addPrefixFlag(final int ordinal, final byte[] r) { 1166 byte[] prefix = Bytes.toBytes((short) ordinal); 1167 if (prefix.length != Bytes.SIZEOF_SHORT) { 1168 throw new RuntimeException("Unexpected size: " + prefix.length); 1169 } 1170 byte[] result = new byte[prefix.length + r.length]; 1171 System.arraycopy(prefix, 0, result, 0, prefix.length); 1172 System.arraycopy(r, 0, result, prefix.length, r.length); 1173 return result; 1174 } 1175 1176 /** 1177 * Returns type from the Counts enum of this row. Reads prefix added by 1178 * {@link #addPrefixFlag(int, byte[])} 1179 */ 1180 public static VerifyCounts whichType(final byte[] bs) { 1181 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1182 return VerifyCounts.values()[ordinal]; 1183 } 1184 1185 /** Returns Row bytes minus the type flag. */ 1186 public static byte[] getRowOnly(BytesWritable bw) { 1187 byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT]; 1188 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1189 return bytes; 1190 } 1191 1192 @Override 1193 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1194 throws IOException, InterruptedException { 1195 int defCount = 0; 1196 boolean lostFamilies = false; 1197 refs.clear(); 1198 for (BytesWritable type : values) { 1199 if (type.getLength() == DEF.getLength()) { 1200 defCount++; 1201 if (type.getBytes()[0] == 1) { 1202 lostFamilies = true; 1203 } 1204 } else { 1205 byte[] bytes = new byte[type.getLength()]; 1206 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1207 refs.add(bytes); 1208 } 1209 } 1210 1211 // TODO check for more than one def, should not happen 1212 StringBuilder refsSb = null; 1213 if (defCount == 0 || refs.size() != 1) { 1214 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1215 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1216 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" 1217 + (refsSb != null ? refsSb.toString() : "")); 1218 } 1219 if (lostFamilies) { 1220 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1221 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1222 context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1); 1223 context.write(key, LOSTFAM); 1224 } 1225 1226 if (defCount == 0 && refs.size() > 0) { 1227 // This is bad, found a node that is referenced but not defined. It must have been 1228 // lost, emit some info about this node for debugging purposes. 1229 // Write out a line per reference. If more than one, flag it.; 1230 for (int i = 0; i < refs.size(); i++) { 1231 byte[] bs = refs.get(i); 1232 int ordinal; 1233 if (i <= 0) { 1234 ordinal = VerifyCounts.UNDEFINED.ordinal(); 1235 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1236 context.getCounter(VerifyCounts.UNDEFINED).increment(1); 1237 } else { 1238 ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal(); 1239 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1240 } 1241 } 1242 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1243 // Print out missing row; doing get on reference gives info on when the referencer 1244 // was added which can help a little debugging. This info is only available in mapper 1245 // output -- the 'Linked List error Key...' log message above. What we emit here is 1246 // useless for debugging. 1247 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1248 context.getCounter("undef", keyString).increment(1); 1249 } 1250 } else if (defCount > 0 && refs.isEmpty()) { 1251 // node is defined but not referenced 1252 context.write(key, UNREF); 1253 context.getCounter(VerifyCounts.UNREFERENCED).increment(1); 1254 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1255 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1256 context.getCounter("unref", keyString).increment(1); 1257 } 1258 } else { 1259 if (refs.size() > 1) { 1260 // Skip first reference. 1261 for (int i = 1; i < refs.size(); i++) { 1262 context.write(key, new BytesWritable( 1263 addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1264 } 1265 context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1); 1266 } 1267 // node is defined and referenced 1268 context.getCounter(VerifyCounts.REFERENCED).increment(1); 1269 } 1270 } 1271 1272 /** 1273 * Dump out extra info around references if there are any. Helps debugging. 1274 * @return StringBuilder filled with references if any. 1275 */ 1276 @SuppressWarnings("JavaUtilDate") 1277 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1278 final List<byte[]> refs) throws IOException { 1279 StringBuilder refsSb = null; 1280 if (refs.isEmpty()) return refsSb; 1281 refsSb = new StringBuilder(); 1282 String comma = ""; 1283 // If a row is a reference but has no define, print the content of the row that has 1284 // this row as a 'prev'; it will help debug. The missing row was written just before 1285 // the row we are dumping out here. 1286 TableName tn = getTableName(context.getConfiguration()); 1287 try (Table t = this.connection.getTable(tn)) { 1288 for (byte[] ref : refs) { 1289 Result r = t.get(new Get(ref)); 1290 List<Cell> cells = r.listCells(); 1291 String ts = (cells != null && !cells.isEmpty()) 1292 ? new java.util.Date(cells.get(0).getTimestamp()).toString() 1293 : ""; 1294 byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1295 String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : ""; 1296 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1297 long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1; 1298 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1299 String refRegionLocation = ""; 1300 String keyRegionLocation = ""; 1301 if (b != null && b.length > 0) { 1302 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1303 HRegionLocation hrl = rl.getRegionLocation(b); 1304 if (hrl != null) refRegionLocation = hrl.toString(); 1305 // Key here probably has trailing zeros on it. 1306 hrl = rl.getRegionLocation(key.getBytes()); 1307 if (hrl != null) keyRegionLocation = hrl.toString(); 1308 } 1309 } 1310 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) 1311 + ", refPrevEqualsKey=" 1312 + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) 1313 + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) 1314 + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count 1315 + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation=" 1316 + keyRegionLocation); 1317 refsSb.append(comma); 1318 comma = ","; 1319 refsSb.append(Bytes.toStringBinary(ref)); 1320 } 1321 } 1322 return refsSb; 1323 } 1324 } 1325 1326 @Override 1327 public int run(String[] args) throws Exception { 1328 if (args.length != 2) { 1329 System.out 1330 .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>"); 1331 return 0; 1332 } 1333 1334 String outputDir = args[0]; 1335 int numReducers = Integer.parseInt(args[1]); 1336 1337 return run(outputDir, numReducers); 1338 } 1339 1340 public int run(String outputDir, int numReducers) throws Exception { 1341 return run(new Path(outputDir), numReducers); 1342 } 1343 1344 public int run(Path outputDir, int numReducers) throws Exception { 1345 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 1346 1347 job = Job.getInstance(getConf()); 1348 1349 job.setJobName("Link Verifier"); 1350 job.setNumReduceTasks(numReducers); 1351 job.setJarByClass(getClass()); 1352 1353 setJobScannerConf(job); 1354 1355 Scan scan = new Scan(); 1356 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1357 scan.setCaching(10000); 1358 scan.setCacheBlocks(false); 1359 if (isMultiUnevenColumnFamilies(getConf())) { 1360 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1361 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1362 } 1363 1364 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1365 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1366 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1367 AbstractHBaseTool.class); 1368 1369 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1370 1371 job.setReducerClass(VerifyReducer.class); 1372 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1373 job.setOutputKeyClass(BytesWritable.class); 1374 job.setOutputValueClass(BytesWritable.class); 1375 TextOutputFormat.setOutputPath(job, outputDir); 1376 1377 boolean success = job.waitForCompletion(true); 1378 1379 if (success) { 1380 Counters counters = job.getCounters(); 1381 if (null == counters) { 1382 LOG.warn("Counters were null, cannot verify Job completion." 1383 + " This is commonly a result of insufficient YARN configuration."); 1384 // We don't have access to the counters to know if we have "bad" counts 1385 return 0; 1386 } 1387 1388 // If we find no unexpected values, the job didn't outright fail 1389 if (verifyUnexpectedValues(counters)) { 1390 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1391 return 0; 1392 } 1393 } 1394 1395 // We failed 1396 return 1; 1397 } 1398 1399 public boolean verify(long expectedReferenced) throws Exception { 1400 if (job == null) { 1401 throw new IllegalStateException("You should call run() first"); 1402 } 1403 1404 Counters counters = job.getCounters(); 1405 if (counters == null) { 1406 LOG.info("Counters object was null, write verification cannot be performed." 1407 + " This is commonly a result of insufficient YARN configuration."); 1408 return false; 1409 } 1410 1411 // Run through each check, even if we fail one early 1412 boolean success = verifyExpectedValues(expectedReferenced, counters); 1413 1414 if (!verifyUnexpectedValues(counters)) { 1415 // We found counter objects which imply failure 1416 success = false; 1417 } 1418 1419 if (!success) { 1420 handleFailure(counters); 1421 } 1422 return success; 1423 } 1424 1425 /** 1426 * Verify the values in the Counters against the expected number of entries written. Expected 1427 * number of referenced entrires The Job's Counters object 1428 * @return True if the values match what's expected, false otherwise 1429 */ 1430 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1431 final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED); 1432 final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED); 1433 boolean success = true; 1434 1435 if (expectedReferenced != referenced.getValue()) { 1436 LOG.error("Expected referenced count does not match with actual referenced count. " 1437 + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1438 success = false; 1439 } 1440 1441 if (unreferenced.getValue() > 0) { 1442 final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES); 1443 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1444 LOG.error( 1445 "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1446 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1447 success = false; 1448 } 1449 1450 return success; 1451 } 1452 1453 /** 1454 * Verify that the Counters don't contain values which indicate an outright failure from the 1455 * Reducers. The Job's counters 1456 * @return True if the "bad" counter objects are 0, false otherwise 1457 */ 1458 protected boolean verifyUnexpectedValues(Counters counters) { 1459 final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED); 1460 final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES); 1461 boolean success = true; 1462 1463 if (undefined.getValue() > 0) { 1464 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1465 success = false; 1466 } 1467 1468 if (lostfamilies.getValue() > 0) { 1469 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1470 success = false; 1471 } 1472 1473 return success; 1474 } 1475 1476 protected void handleFailure(Counters counters) throws IOException { 1477 Configuration conf = job.getConfiguration(); 1478 TableName tableName = getTableName(conf); 1479 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1480 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1481 CounterGroup g = counters.getGroup("undef"); 1482 Iterator<Counter> it = g.iterator(); 1483 while (it.hasNext()) { 1484 String keyString = it.next().getName(); 1485 byte[] key = Bytes.toBytes(keyString); 1486 HRegionLocation loc = rl.getRegionLocation(key, true); 1487 LOG.error("undefined row " + keyString + ", " + loc); 1488 } 1489 g = counters.getGroup("unref"); 1490 it = g.iterator(); 1491 while (it.hasNext()) { 1492 String keyString = it.next().getName(); 1493 byte[] key = Bytes.toBytes(keyString); 1494 HRegionLocation loc = rl.getRegionLocation(key, true); 1495 LOG.error("unreferred row " + keyString + ", " + loc); 1496 } 1497 } 1498 } 1499 } 1500 } 1501 1502 /** 1503 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1504 * adds more data. 1505 */ 1506 static class Loop extends Configured implements Tool { 1507 1508 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1509 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " 1510 + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" 1511 + " <num walker threads>] \n" 1512 + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" 1513 + "walkers will select and verify random flushed loop during Generation."; 1514 1515 IntegrationTestBigLinkedList it; 1516 1517 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 1518 Integer wrapMultiplier, Integer numWalkers) throws Exception { 1519 Path outputPath = new Path(outputDir); 1520 UUID uuid = UUID.randomUUID(); // create a random UUID. 1521 Path generatorOutput = new Path(outputPath, uuid.toString()); 1522 1523 Generator generator = new Generator(); 1524 generator.setConf(getConf()); 1525 int retCode = 1526 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 1527 if (retCode > 0) { 1528 throw new RuntimeException("Generator failed with return code: " + retCode); 1529 } 1530 if (numWalkers > 0) { 1531 if (!generator.verify()) { 1532 throw new RuntimeException("Generator.verify failed"); 1533 } 1534 } 1535 LOG.info("Generator finished with success. Total nodes=" + numNodes); 1536 } 1537 1538 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes) 1539 throws Exception { 1540 Path outputPath = new Path(outputDir); 1541 UUID uuid = UUID.randomUUID(); // create a random UUID. 1542 Path iterationOutput = new Path(outputPath, uuid.toString()); 1543 1544 Verify verify = new Verify(); 1545 verify.setConf(getConf()); 1546 int retCode = verify.run(iterationOutput, numReducers); 1547 if (retCode > 0) { 1548 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1549 } 1550 1551 if (!verify.verify(expectedNumNodes)) { 1552 throw new RuntimeException("Verify.verify failed"); 1553 } 1554 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1555 } 1556 1557 @Override 1558 public int run(String[] args) throws Exception { 1559 if (args.length < 5) { 1560 System.err.println(USAGE); 1561 return 1; 1562 } 1563 try { 1564 int numIterations = Integer.parseInt(args[0]); 1565 int numMappers = Integer.parseInt(args[1]); 1566 long numNodes = Long.parseLong(args[2]); 1567 String outputDir = args[3]; 1568 int numReducers = Integer.parseInt(args[4]); 1569 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1570 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1571 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1572 1573 long expectedNumNodes = 0; 1574 1575 if (numIterations < 0) { 1576 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 1577 } 1578 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1579 for (int i = 0; i < numIterations; i++) { 1580 LOG.info("Starting iteration = " + i); 1581 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1582 expectedNumNodes += numMappers * numNodes; 1583 runVerify(outputDir, numReducers, expectedNumNodes); 1584 } 1585 return 0; 1586 } catch (NumberFormatException e) { 1587 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1588 System.err.println(USAGE); 1589 return 1; 1590 } 1591 } 1592 } 1593 1594 /** 1595 * A stand alone program that prints out portions of a list created by {@link Generator} 1596 */ 1597 private static class Print extends Configured implements Tool { 1598 @Override 1599 public int run(String[] args) throws Exception { 1600 Options options = new Options(); 1601 options.addOption("s", "start", true, "start key"); 1602 options.addOption("e", "end", true, "end key"); 1603 options.addOption("l", "limit", true, "number to print"); 1604 1605 GnuParser parser = new GnuParser(); 1606 CommandLine cmd = null; 1607 try { 1608 cmd = parser.parse(options, args); 1609 if (cmd.getArgs().length != 0) { 1610 throw new ParseException("Command takes no arguments"); 1611 } 1612 } catch (ParseException e) { 1613 System.err.println("Failed to parse command line " + e.getMessage()); 1614 System.err.println(); 1615 HelpFormatter formatter = new HelpFormatter(); 1616 formatter.printHelp(getClass().getSimpleName(), options); 1617 System.exit(-1); 1618 } 1619 1620 Connection connection = ConnectionFactory.createConnection(getConf()); 1621 Table table = connection.getTable(getTableName(getConf())); 1622 1623 Scan scan = new Scan(); 1624 scan.setBatch(10000); 1625 1626 if (cmd.hasOption("s")) scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1627 1628 if (cmd.hasOption("e")) { 1629 scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1630 } 1631 1632 int limit = 0; 1633 if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l")); 1634 else limit = 100; 1635 1636 ResultScanner scanner = table.getScanner(scan); 1637 1638 CINode node = new CINode(); 1639 Result result = scanner.next(); 1640 int count = 0; 1641 while (result != null && count++ < limit) { 1642 node = getCINode(result, node); 1643 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1644 Bytes.toStringBinary(node.prev), node.count, node.client); 1645 result = scanner.next(); 1646 } 1647 scanner.close(); 1648 table.close(); 1649 connection.close(); 1650 1651 return 0; 1652 } 1653 } 1654 1655 /** 1656 * A stand alone program that deletes a single node. 1657 */ 1658 private static class Delete extends Configured implements Tool { 1659 @Override 1660 public int run(String[] args) throws Exception { 1661 if (args.length != 1) { 1662 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1663 return 0; 1664 } 1665 byte[] val = Bytes.toBytesBinary(args[0]); 1666 1667 org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val); 1668 1669 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1670 Table table = connection.getTable(getTableName(getConf()))) { 1671 table.delete(delete); 1672 } 1673 1674 System.out.println("Delete successful"); 1675 return 0; 1676 } 1677 } 1678 1679 abstract static class WalkerBase extends Configured { 1680 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1681 Scan scan = new Scan(); 1682 scan.withStartRow(startKey); 1683 scan.setBatch(1); 1684 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1685 1686 long t1 = EnvironmentEdgeManager.currentTime(); 1687 ResultScanner scanner = table.getScanner(scan); 1688 Result result = scanner.next(); 1689 long t2 = EnvironmentEdgeManager.currentTime(); 1690 scanner.close(); 1691 1692 if (result != null) { 1693 CINode node = getCINode(result, new CINode()); 1694 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1695 return node; 1696 } 1697 1698 System.out.println("FSR " + (t2 - t1)); 1699 1700 return null; 1701 } 1702 1703 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1704 Get get = new Get(row); 1705 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1706 Result result = table.get(get); 1707 return getCINode(result, node); 1708 } 1709 } 1710 1711 /** 1712 * A stand alone program that follows a linked list created by {@link Generator} and prints timing 1713 * info. 1714 */ 1715 private static class Walker extends WalkerBase implements Tool { 1716 1717 public Walker() { 1718 } 1719 1720 @Override 1721 public int run(String[] args) throws IOException { 1722 1723 Options options = new Options(); 1724 options.addOption("n", "num", true, "number of queries"); 1725 options.addOption("s", "start", true, "key to start at, binary string"); 1726 options.addOption("l", "logevery", true, "log every N queries"); 1727 1728 GnuParser parser = new GnuParser(); 1729 CommandLine cmd = null; 1730 try { 1731 cmd = parser.parse(options, args); 1732 if (cmd.getArgs().length != 0) { 1733 throw new ParseException("Command takes no arguments"); 1734 } 1735 } catch (ParseException e) { 1736 System.err.println("Failed to parse command line " + e.getMessage()); 1737 System.err.println(); 1738 HelpFormatter formatter = new HelpFormatter(); 1739 formatter.printHelp(getClass().getSimpleName(), options); 1740 System.exit(-1); 1741 } 1742 1743 long maxQueries = Long.MAX_VALUE; 1744 if (cmd.hasOption('n')) { 1745 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1746 } 1747 boolean isSpecificStart = cmd.hasOption('s'); 1748 1749 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1750 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1751 1752 Connection connection = ConnectionFactory.createConnection(getConf()); 1753 Table table = connection.getTable(getTableName(getConf())); 1754 long numQueries = 0; 1755 // If isSpecificStart is set, only walk one list from that particular node. 1756 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1757 // the case in normal run without startKey. 1758 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1759 if (!isSpecificStart) { 1760 startKey = new byte[ROWKEY_LENGTH]; 1761 Bytes.random(startKey); 1762 } 1763 CINode node = findStartNode(table, startKey); 1764 if (node == null && isSpecificStart) { 1765 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1766 } 1767 numQueries++; 1768 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) { 1769 byte[] prev = node.prev; 1770 long t1 = EnvironmentEdgeManager.currentTime(); 1771 node = getNode(prev, table, node); 1772 long t2 = EnvironmentEdgeManager.currentTime(); 1773 if (logEvery > 0 && numQueries % logEvery == 0) { 1774 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1775 } 1776 numQueries++; 1777 if (node == null) { 1778 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1779 } else if (node.prev.length == NO_KEY.length) { 1780 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1781 } 1782 } 1783 } 1784 table.close(); 1785 connection.close(); 1786 return 0; 1787 } 1788 } 1789 1790 private static class Clean extends Configured implements Tool { 1791 @Override 1792 public int run(String[] args) throws Exception { 1793 if (args.length < 1) { 1794 System.err.println("Usage: Clean <output dir>"); 1795 return -1; 1796 } 1797 1798 Path p = new Path(args[0]); 1799 Configuration conf = getConf(); 1800 TableName tableName = getTableName(conf); 1801 try (FileSystem fs = HFileSystem.get(conf); 1802 Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { 1803 if (admin.tableExists(tableName)) { 1804 admin.disableTable(tableName); 1805 admin.deleteTable(tableName); 1806 } 1807 1808 if (fs.exists(p)) { 1809 fs.delete(p, true); 1810 } 1811 } 1812 1813 return 0; 1814 } 1815 } 1816 1817 static TableName getTableName(Configuration conf) { 1818 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1819 } 1820 1821 private static CINode getCINode(Result result, CINode node) { 1822 node.key = Bytes.copy(result.getRow()); 1823 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1824 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1825 } else { 1826 node.prev = NO_KEY; 1827 } 1828 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1829 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1830 } else { 1831 node.count = -1; 1832 } 1833 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1834 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1835 } else { 1836 node.client = ""; 1837 } 1838 return node; 1839 } 1840 1841 @Override 1842 public void setUpCluster() throws Exception { 1843 util = getTestingUtil(getConf()); 1844 boolean isDistributed = util.isDistributedCluster(); 1845 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1846 if (!isDistributed) { 1847 util.startMiniMapReduceCluster(); 1848 } 1849 this.setConf(util.getConfiguration()); 1850 } 1851 1852 @Override 1853 public void cleanUpCluster() throws Exception { 1854 super.cleanUpCluster(); 1855 if (util.isDistributedCluster()) { 1856 util.shutdownMiniMapReduceCluster(); 1857 } 1858 } 1859 1860 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1861 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true); 1862 } 1863 1864 @Test 1865 public void testContinuousIngest() throws IOException, Exception { 1866 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1867 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1868 if (isMultiUnevenColumnFamilies(getConf())) { 1869 // make sure per CF flush is on 1870 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 1871 FlushAllLargeStoresPolicy.class.getName()); 1872 } 1873 int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1874 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1875 org.junit.Assert.assertEquals(0, ret); 1876 } 1877 1878 private void usage() { 1879 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1880 printCommands(); 1881 } 1882 1883 private void printCommands() { 1884 System.err.println("Commands:"); 1885 System.err.println(" generator Map only job that generates data."); 1886 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1887 System.err.println(" look at the counts after running. See REFERENCED and"); 1888 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1889 System.err.println(" with the Generator."); 1890 System.err.println(" walker " 1891 + "Standalone program that starts following a linked list & emits timing info."); 1892 System.err.println(" print Standalone program that prints nodes in the linked list."); 1893 System.err.println(" delete Standalone program that deletes a single node."); 1894 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1895 System.err.println(" clean Program to clean all left over detritus."); 1896 System.err.println(" search Search for missing keys."); 1897 System.err.println(""); 1898 System.err.println("General options:"); 1899 System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>"); 1900 System.err.println( 1901 " Run using the <tableName> as the tablename. Defaults to " + DEFAULT_TABLE_NAME); 1902 System.err.println(" -D" + REGIONS_PER_SERVER_KEY + "=<# regions>"); 1903 System.err.println(" Create table with presplit regions per server. Defaults to " 1904 + DEFAULT_REGIONS_PER_SERVER); 1905 1906 System.err.println(" -DuseMob=<true|false>"); 1907 System.err.println( 1908 " Create table so that the mob read/write path is forced. " + "Defaults to false"); 1909 1910 System.err.flush(); 1911 } 1912 1913 @Override 1914 protected void processOptions(CommandLine cmd) { 1915 super.processOptions(cmd); 1916 String[] args = cmd.getArgs(); 1917 // get the class, run with the conf 1918 if (args.length < 1) { 1919 printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]", 1920 "General options:", ""); 1921 printCommands(); 1922 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1923 throw new RuntimeException("Incorrect Number of args."); 1924 } 1925 toRun = args[0]; 1926 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1927 } 1928 1929 @Override 1930 public int runTestFromCommandLine() throws Exception { 1931 Tool tool = null; 1932 if (toRun.equalsIgnoreCase("Generator")) { 1933 tool = new Generator(); 1934 } else if (toRun.equalsIgnoreCase("Verify")) { 1935 tool = new Verify(); 1936 } else if (toRun.equalsIgnoreCase("Loop")) { 1937 Loop loop = new Loop(); 1938 loop.it = this; 1939 tool = loop; 1940 } else if (toRun.equalsIgnoreCase("Walker")) { 1941 tool = new Walker(); 1942 } else if (toRun.equalsIgnoreCase("Print")) { 1943 tool = new Print(); 1944 } else if (toRun.equalsIgnoreCase("Delete")) { 1945 tool = new Delete(); 1946 } else if (toRun.equalsIgnoreCase("Clean")) { 1947 tool = new Clean(); 1948 } else if (toRun.equalsIgnoreCase("Search")) { 1949 tool = new Search(); 1950 } else { 1951 usage(); 1952 throw new RuntimeException("Unknown arg"); 1953 } 1954 1955 return ToolRunner.run(getConf(), tool, otherArgs); 1956 } 1957 1958 @Override 1959 public TableName getTablename() { 1960 Configuration c = getConf(); 1961 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1962 } 1963 1964 @Override 1965 protected Set<String> getColumnFamilies() { 1966 if (isMultiUnevenColumnFamilies(getConf())) { 1967 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1968 Bytes.toString(TINY_FAMILY_NAME)); 1969 } else { 1970 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1971 } 1972 } 1973 1974 private static void setJobConf(Job job, int numMappers, long numNodes, Integer width, 1975 Integer wrapMultiplier, Integer numWalkers) { 1976 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1977 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1978 if (width != null) { 1979 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 1980 } 1981 if (wrapMultiplier != null) { 1982 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 1983 } 1984 if (numWalkers != null) { 1985 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 1986 } 1987 } 1988 1989 public static void setJobScannerConf(Job job) { 1990 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 1991 } 1992 1993 public static void main(String[] args) throws Exception { 1994 Configuration conf = HBaseConfiguration.create(); 1995 IntegrationTestingUtility.setUseDistributedCluster(conf); 1996 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 1997 System.exit(ret); 1998 } 1999}