001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.apache.hadoop.hbase.IntegrationTestingUtility.DEFAULT_REGIONS_PER_SERVER;
021import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE;
022import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE_KEY;
023import static org.apache.hadoop.hbase.IntegrationTestingUtility.REGIONS_PER_SERVER_KEY;
024
025import java.io.DataInput;
026import java.io.DataOutput;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Set;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.LocatedFileStatus;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.RemoteIterator;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.IntegrationTestBase;
051import org.apache.hadoop.hbase.IntegrationTestingUtility;
052import org.apache.hadoop.hbase.MasterNotRunningException;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.BufferedMutator;
056import org.apache.hadoop.hbase.client.BufferedMutatorParams;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionConfiguration;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.Get;
062import org.apache.hadoop.hbase.client.Mutation;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.RegionLocator;
065import org.apache.hadoop.hbase.client.Result;
066import org.apache.hadoop.hbase.client.ResultScanner;
067import org.apache.hadoop.hbase.client.Scan;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
073import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
074import org.apache.hadoop.hbase.mapreduce.TableMapper;
075import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
076import org.apache.hadoop.hbase.mapreduce.WALPlayer;
077import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
078import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
079import org.apache.hadoop.hbase.testclassification.IntegrationTests;
080import org.apache.hadoop.hbase.util.AbstractHBaseTool;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
084import org.apache.hadoop.hbase.util.Random64;
085import org.apache.hadoop.hbase.util.RegionSplitter;
086import org.apache.hadoop.hbase.wal.WALEdit;
087import org.apache.hadoop.hbase.wal.WALKey;
088import org.apache.hadoop.io.BytesWritable;
089import org.apache.hadoop.io.NullWritable;
090import org.apache.hadoop.io.Writable;
091import org.apache.hadoop.mapreduce.Counter;
092import org.apache.hadoop.mapreduce.CounterGroup;
093import org.apache.hadoop.mapreduce.Counters;
094import org.apache.hadoop.mapreduce.InputFormat;
095import org.apache.hadoop.mapreduce.InputSplit;
096import org.apache.hadoop.mapreduce.Job;
097import org.apache.hadoop.mapreduce.JobContext;
098import org.apache.hadoop.mapreduce.Mapper;
099import org.apache.hadoop.mapreduce.RecordReader;
100import org.apache.hadoop.mapreduce.Reducer;
101import org.apache.hadoop.mapreduce.TaskAttemptContext;
102import org.apache.hadoop.mapreduce.TaskAttemptID;
103import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
104import org.apache.hadoop.mapreduce.lib.input.FileSplit;
105import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
106import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
107import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
110import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
111import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
112import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
113import org.apache.hadoop.util.Tool;
114import org.apache.hadoop.util.ToolRunner;
115import org.junit.Test;
116import org.junit.experimental.categories.Category;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119
120import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
121import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
122import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
124import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
125import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
126import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
127import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
128
129/**
130 * <p>
131 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn
132 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found
133 * here:
134 * <ul>
135 * <li>https://github.com/keith-turner/goraci</li>
136 * <li>https://github.com/enis/goraci/</li>
137 * </ul>
138 * </p>
139 * <p>
140 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This
141 * test suite is called continuous ingest. This test runs many ingest clients that continually
142 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map
143 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost.
144 * </p>
145 * <p>
146 * The nodes in the linked list are random. This causes each linked list to spread across the table.
147 * Therefore if one part of a table loses data, then it will be detected by references in another
148 * part of the table.
149 * </p>
150 * <p>
151 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific
152 * details look at the Generator code.
153 * </p>
154 * <p>
155 * <ol>
156 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li>
157 * <li>Flush the client</li>
158 * <li>Write out 1 million that reference previous million</li>
159 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last
160 * (25 is configurable; its the 'wrap multiplier' referred to below)</li>
161 * <li>goto 1</li>
162 * </ol>
163 * </p>
164 * <p>
165 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a
166 * missing node, even if the ingest client is killed at any point in time.
167 * </p>
168 * <p>
169 * When running this test suite w/ Accumulo there is a script running in parallel called the
170 * Aggitator that randomly and continuously kills server processes. The outcome was that many data
171 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that
172 * impact uptime and stability when run for days or weeks.
173 * </p>
174 * <p>
175 * This test suite consists the following
176 * <ul>
177 * <li>a few Java programs</li>
178 * <li>a little helper script to run the java programs</li>
179 * <li>a maven script to build it</li>
180 * </ul>
181 * </p>
182 * <p>
183 * When generating data, its best to have each map task generate a multiple of 25 million. The
184 * reason for this is that circular linked list are generated every 25M. Not generating a multiple
185 * in 25M will result in some nodes in the linked list not having references. The loss of an
186 * unreferenced node can not be detected.
187 * </p>
188 * <p>
189 * <h3>Below is a description of the Java programs</h3>
190 * <ul>
191 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to
192 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to
193 * select and walk random flushed loops during this phase.</li>
194 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running.
195 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not
196 * run at the same time as the Generator.</li>
197 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing
198 * info.</li>
199 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li>
200 * <li>{@code Delete} - A standalone program that deletes a single node</li>
201 * </ul>
202 * This class can be run as a unit test, as an integration test, or from the command line
203 * </p>
204 * <p>
205 * ex:
206 *
207 * <pre>
208 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
209 *    loop 2 1 100000 /temp 1 1000 50 1 0
210 * </pre>
211 * </p>
212 */
213@Category(IntegrationTests.class)
214public class IntegrationTestBigLinkedList extends IntegrationTestBase {
215  protected static final byte[] NO_KEY = new byte[1];
216  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
217  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
218  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
219  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
220  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
221
222  // link to the id of the prev node in the linked list
223  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
224
225  // identifier of the mapred task that generated this row
226  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
227
228  // the id of the row within the same client.
229  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
230
231  /** How many rows to write per map task. This has to be a multiple of 25M */
232  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY =
233    "IntegrationTestBigLinkedList.generator.num_rows";
234
235  private static final String GENERATOR_NUM_MAPPERS_KEY =
236    "IntegrationTestBigLinkedList.generator.map.tasks";
237
238  private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width";
239
240  private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap";
241
242  private static final String CONCURRENT_WALKER_KEY =
243    "IntegrationTestBigLinkedList.generator.concurrentwalkers";
244
245  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
246
247  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
248
249  private static final int WIDTH_DEFAULT = 1000000;
250
251  /**
252   * The 'wrap multipler' default.
253   */
254  private static final int WRAP_DEFAULT = 25;
255  private static final int ROWKEY_LENGTH = 16;
256
257  private static final int CONCURRENT_WALKER_DEFAULT = 0;
258
259  protected String toRun;
260  protected String[] otherArgs;
261
262  static class CINode {
263    byte[] key;
264    byte[] prev;
265    String client;
266    long count;
267  }
268
269  /**
270   * A Map only job that generates random linked list and stores them.
271   */
272  static class Generator extends Configured implements Tool {
273    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
274
275    /**
276     * Set this configuration if you want to test single-column family flush works. If set, we will
277     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
278     * column family. When we write out the ITBLL, we will also add to the big column family a value
279     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
280     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
281     * way. Here is how you would pass it:
282     * <p>
283     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
284     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
285     */
286    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
287      "generator.multiple.columnfamilies";
288
289    /**
290     * Set this configuration if you want to scale up the size of test data quickly.
291     * <p>
292     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
293     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
294     */
295    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
296
297    public static enum GeneratorCounts {
298      SUCCESS,
299      TERMINATING,
300      UNDEFINED,
301      IOEXCEPTION
302    }
303
304    public static final String USAGE = "Usage : " + Generator.class.getSimpleName()
305      + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>"
306      + " <num walker threads>] \n"
307      + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n"
308      + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n"
309      + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n"
310      + "first written nodes back to the 25th set.\n"
311      + "Walkers verify random flushed loops during Generation.";
312
313    public Job job;
314
315    static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> {
316      static class GeneratorInputSplit extends InputSplit implements Writable {
317        @Override
318        public long getLength() throws IOException, InterruptedException {
319          return 1;
320        }
321
322        @Override
323        public String[] getLocations() throws IOException, InterruptedException {
324          return new String[0];
325        }
326
327        @Override
328        public void readFields(DataInput arg0) throws IOException {
329        }
330
331        @Override
332        public void write(DataOutput arg0) throws IOException {
333        }
334      }
335
336      static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> {
337        private long count;
338        private long numNodes;
339        // Use Random64 to avoid issue described in HBASE-21256.
340        private Random64 rand = new Random64();
341
342        @Override
343        public void close() throws IOException {
344        }
345
346        @Override
347        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
348          byte[] bytes = new byte[ROWKEY_LENGTH];
349          rand.nextBytes(bytes);
350          return new BytesWritable(bytes);
351        }
352
353        @Override
354        public NullWritable getCurrentValue() throws IOException, InterruptedException {
355          return NullWritable.get();
356        }
357
358        @Override
359        public float getProgress() throws IOException, InterruptedException {
360          return (float) (count / (double) numNodes);
361        }
362
363        @Override
364        public void initialize(InputSplit arg0, TaskAttemptContext context)
365          throws IOException, InterruptedException {
366          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
367        }
368
369        @Override
370        public boolean nextKeyValue() throws IOException, InterruptedException {
371          return count++ < numNodes;
372        }
373      }
374
375      @Override
376      public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
377        TaskAttemptContext context) throws IOException, InterruptedException {
378        GeneratorRecordReader rr = new GeneratorRecordReader();
379        rr.initialize(split, context);
380        return rr;
381      }
382
383      @Override
384      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
385        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
386
387        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
388
389        for (int i = 0; i < numMappers; i++) {
390          splits.add(new GeneratorInputSplit());
391        }
392
393        return splits;
394      }
395    }
396
397    /** Ensure output files from prev-job go to map inputs for current job */
398    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
399      @Override
400      protected boolean isSplitable(JobContext context, Path filename) {
401        return false;
402      }
403    }
404
405    /**
406     * Some ASCII art time:
407     * <p>
408     * [ . . . ] represents one batch of random longs of length WIDTH
409     *
410     * <pre>
411     *                _________________________
412     *               |                  ______ |
413     *               |                 |      ||
414     *             .-+-----------------+-----.||
415     *             | |                 |     |||
416     * first   = [ . . . . . . . . . . . ]   |||
417     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
418     *             | | | | | | | | | | |     |||
419     * prev    = [ . . . . . . . . . . . ]   |||
420     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
421     *             | | | | | | | | | | |     |||
422     * current = [ . . . . . . . . . . . ]   |||
423     *                                       |||
424     * ...                                   |||
425     *                                       |||
426     * last    = [ . . . . . . . . . . . ]   |||
427     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
428     *             |                 |________||
429     *             |___________________________|
430     * </pre>
431     */
432
433    static class GeneratorMapper
434      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
435
436      byte[][] first = null;
437      byte[][] prev = null;
438      byte[][] current = null;
439      byte[] id;
440      long count = 0;
441      int i;
442      BufferedMutator mutator;
443      Connection connection;
444      long numNodes;
445      long wrap;
446      int width;
447      boolean multipleUnevenColumnFamilies;
448      byte[] tinyValue = new byte[] { 't' };
449      byte[] bigValue = null;
450      Configuration conf;
451      // Use Random64 to avoid issue described in HBASE-21256.
452      private Random64 rand = new Random64();
453
454      volatile boolean walkersStop;
455      int numWalkers;
456      final Object flushedLoopsLock = new Object();
457      volatile List<Long> flushedLoops = new ArrayList<>();
458      List<Thread> walkers = new ArrayList<>();
459
460      @Override
461      protected void setup(Context context) throws IOException, InterruptedException {
462        id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID());
463        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
464        instantiateHTable();
465        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
466        current = new byte[this.width][];
467        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
468        this.wrap = (long) wrapMultiplier * width;
469        this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY,
470          (long) WIDTH_DEFAULT * WRAP_DEFAULT);
471        if (this.numNodes < this.wrap) {
472          this.wrap = this.numNodes;
473        }
474        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
475        this.numWalkers =
476          context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
477        this.walkersStop = false;
478        this.conf = context.getConfiguration();
479
480        if (multipleUnevenColumnFamilies) {
481          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
482          int limit =
483            context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
484              ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
485
486          Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n,
487            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
488
489          bigValue = new byte[n];
490          rand.nextBytes(bigValue);
491          LOG.info("Create a bigValue with " + n + " bytes.");
492        }
493
494        Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes);
495        Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0",
496          numNodes, width);
497        Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0",
498          numNodes, wrap);
499      }
500
501      protected void instantiateHTable() throws IOException {
502        mutator = connection
503          .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration()))
504            .writeBufferSize(4 * 1024 * 1024));
505      }
506
507      @Override
508      protected void cleanup(Context context) throws IOException, InterruptedException {
509        joinWalkers();
510        mutator.close();
511        connection.close();
512      }
513
514      @Override
515      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
516        current[i] = new byte[key.getLength()];
517        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
518        if (++i == current.length) {
519          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length,
520            count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
521          persist(output, count, prev, current, id);
522          i = 0;
523
524          if (first == null) {
525            first = current;
526          }
527          prev = current;
528          current = new byte[this.width][];
529
530          count += current.length;
531          output.setStatus("Count " + count);
532
533          if (count % wrap == 0) {
534            // this block of code turns the 1 million linked list of length 25 into one giant
535            // circular linked list of 25 million
536            circularLeftShift(first);
537            persist(output, -1, prev, first, null);
538            // At this point the entire loop has been flushed so we can add one of its nodes to the
539            // concurrent walker
540            if (numWalkers > 0) {
541              addFlushed(key.getBytes());
542              if (walkers.isEmpty()) {
543                startWalkers(numWalkers, conf, output);
544              }
545            }
546            first = null;
547            prev = null;
548          }
549        }
550      }
551
552      private static <T> void circularLeftShift(T[] first) {
553        T ez = first[0];
554        System.arraycopy(first, 1, first, 0, first.length - 1);
555        first[first.length - 1] = ez;
556      }
557
558      private void addFlushed(byte[] rowKey) {
559        synchronized (flushedLoopsLock) {
560          flushedLoops.add(Bytes.toLong(rowKey));
561          flushedLoopsLock.notifyAll();
562        }
563      }
564
565      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
566        throws IOException {
567        for (int i = 0; i < current.length; i++) {
568
569          if (i % 100 == 0) {
570            // Tickle progress every so often else maprunner will think us hung
571            output.progress();
572          }
573
574          Put put = new Put(current[i]);
575          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
576
577          if (count >= 0) {
578            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
579          }
580          if (id != null) {
581            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
582          }
583          // See if we are to write multiple columns.
584          if (this.multipleUnevenColumnFamilies) {
585            // Use any column name.
586            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
587            // Use any column name.
588            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
589          }
590          mutator.mutate(put);
591        }
592
593        mutator.flush();
594      }
595
596      private void startWalkers(int numWalkers, Configuration conf, Context context) {
597        LOG.info("Starting " + numWalkers + " concurrent walkers");
598        for (int i = 0; i < numWalkers; i++) {
599          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
600          walker.start();
601          walkers.add(walker);
602        }
603      }
604
605      private void joinWalkers() {
606        synchronized (flushedLoopsLock) {
607          walkersStop = true;
608          flushedLoopsLock.notifyAll();
609        }
610        for (Thread walker : walkers) {
611          Uninterruptibles.joinUninterruptibly(walker);
612        }
613      }
614
615      /**
616       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
617       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
618       * configured to only log erroneous nodes.
619       */
620
621      public class ContinuousConcurrentWalker implements Runnable {
622
623        ConcurrentWalker walker;
624        Configuration conf;
625        Context context;
626
627        public ContinuousConcurrentWalker(Configuration conf, Context context) {
628          this.conf = conf;
629          this.context = context;
630        }
631
632        @Override
633        public void run() {
634          while (!walkersStop) {
635            try {
636              long node = selectLoop();
637              try {
638                walkLoop(node);
639              } catch (IOException e) {
640                context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l);
641                return;
642              }
643            } catch (InterruptedException e) {
644              return;
645            }
646          }
647        }
648
649        private void walkLoop(long node) throws IOException {
650          walker = new ConcurrentWalker(context);
651          walker.setConf(conf);
652          walker.run(node, wrap);
653        }
654
655        private long selectLoop() throws InterruptedException {
656          synchronized (flushedLoopsLock) {
657            while (flushedLoops.isEmpty() && !walkersStop) {
658              flushedLoopsLock.wait();
659            }
660            if (walkersStop) {
661              throw new InterruptedException();
662            }
663            return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size()));
664          }
665        }
666      }
667
668      public static class ConcurrentWalker extends WalkerBase {
669
670        Context context;
671
672        public ConcurrentWalker(Context context) {
673          this.context = context;
674        }
675
676        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
677
678          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
679          byte[] startKey = Bytes.toBytes(startKeyIn);
680
681          Connection connection = ConnectionFactory.createConnection(getConf());
682          Table table = connection.getTable(getTableName(getConf()));
683          long numQueries = 0;
684          // If isSpecificStart is set, only walk one list from that particular node.
685          // Note that in case of circular (or P-shaped) list it will walk forever, as is
686          // the case in normal run without startKey.
687
688          CINode node = findStartNode(table, startKey);
689          if (node == null) {
690            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
691            throw new IOException("Start node not found: " + startKeyIn);
692          }
693          while (numQueries < maxQueries) {
694            numQueries++;
695            byte[] prev = node.prev;
696            node = getNode(prev, table, node);
697            if (node == null) {
698              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
699              context.getCounter(GeneratorCounts.UNDEFINED).increment(1l);
700            } else if (node.prev.length == NO_KEY.length) {
701              LOG.error(
702                "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key));
703              context.getCounter(GeneratorCounts.TERMINATING).increment(1l);
704            } else {
705              // Increment for successful walk
706              context.getCounter(GeneratorCounts.SUCCESS).increment(1l);
707            }
708          }
709          table.close();
710          connection.close();
711        }
712      }
713    }
714
715    @Override
716    public int run(String[] args) throws Exception {
717      if (args.length < 3) {
718        System.err.println(USAGE);
719        return 1;
720      }
721      try {
722        int numMappers = Integer.parseInt(args[0]);
723        long numNodes = Long.parseLong(args[1]);
724        Path tmpOutput = new Path(args[2]);
725        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
726        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
727        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
728        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
729      } catch (NumberFormatException e) {
730        System.err.println("Parsing generator arguments failed: " + e.getMessage());
731        System.err.println(USAGE);
732        return 1;
733      }
734    }
735
736    protected void createSchema() throws IOException {
737      Configuration conf = getConf();
738      TableName tableName = getTableName(conf);
739      try (Connection conn = ConnectionFactory.createConnection(conf);
740        Admin admin = conn.getAdmin()) {
741        if (!admin.tableExists(tableName)) {
742          TableDescriptor tableDescriptor = TableDescriptorBuilder
743            .newBuilder(getTableName(getConf()))
744            // if -DuseMob=true force all data through mob path.
745            .setColumnFamily(
746              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)).build())
747            // Always add these families. Just skip writing to them when we do not test per CF
748            // flush.
749            .setColumnFamily(
750              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(BIG_FAMILY_NAME))
751                .build())
752            .setColumnFamily(
753              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(TINY_FAMILY_NAME))
754                .build())
755            .build();
756
757          // If we want to pre-split compute how many splits.
758          if (conf.getBoolean(PRESPLIT_TEST_TABLE_KEY, PRESPLIT_TEST_TABLE)) {
759            int numberOfServers = admin.getRegionServers().size();
760            if (numberOfServers == 0) {
761              throw new IllegalStateException("No live regionservers");
762            }
763            int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER);
764            int totalNumberOfRegions = numberOfServers * regionsPerServer;
765            LOG.info("Number of live regionservers: " + numberOfServers + ", "
766              + "pre-splitting table into " + totalNumberOfRegions + " regions "
767              + "(default regions per server: " + regionsPerServer + ")");
768
769            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
770
771            admin.createTable(tableDescriptor, splits);
772          } else {
773            // Looks like we're just letting things play out.
774            // Create a table with on region by default.
775            // This will make the splitting work hard.
776            admin.createTable(tableDescriptor);
777          }
778        }
779      } catch (MasterNotRunningException e) {
780        LOG.error("Master not running", e);
781        throw new IOException(e);
782      }
783    }
784
785    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
786      Integer wrapMultiplier, Integer numWalkers) throws Exception {
787      LOG.info(
788        "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes);
789      Job job = Job.getInstance(getConf());
790
791      job.setJobName("Random Input Generator");
792      job.setNumReduceTasks(0);
793      job.setJarByClass(getClass());
794
795      job.setInputFormatClass(GeneratorInputFormat.class);
796      job.setOutputKeyClass(BytesWritable.class);
797      job.setOutputValueClass(NullWritable.class);
798
799      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
800
801      job.setMapperClass(Mapper.class); // identity mapper
802
803      FileOutputFormat.setOutputPath(job, tmpOutput);
804      job.setOutputFormatClass(SequenceFileOutputFormat.class);
805      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
806
807      boolean success = jobCompletion(job);
808
809      return success ? 0 : 1;
810    }
811
812    public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
813      Integer wrapMultiplier, Integer numWalkers) throws Exception {
814      LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes);
815      createSchema();
816      job = Job.getInstance(getConf());
817
818      job.setJobName("Link Generator");
819      job.setNumReduceTasks(0);
820      job.setJarByClass(getClass());
821
822      FileInputFormat.setInputPaths(job, tmpOutput);
823      job.setInputFormatClass(OneFilePerMapperSFIF.class);
824      job.setOutputKeyClass(NullWritable.class);
825      job.setOutputValueClass(NullWritable.class);
826
827      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
828
829      setMapperForGenerator(job);
830
831      job.setOutputFormatClass(NullOutputFormat.class);
832
833      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
834      TableMapReduceUtil.addDependencyJars(job);
835      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
836        AbstractHBaseTool.class);
837      TableMapReduceUtil.initCredentials(job);
838
839      boolean success = jobCompletion(job);
840
841      return success ? 0 : 1;
842    }
843
844    protected boolean jobCompletion(Job job)
845      throws IOException, InterruptedException, ClassNotFoundException {
846      boolean success = job.waitForCompletion(true);
847      return success;
848    }
849
850    protected void setMapperForGenerator(Job job) {
851      job.setMapperClass(GeneratorMapper.class);
852    }
853
854    public int run(int numMappers, long numNodes, Path tmpOutput, Integer width,
855      Integer wrapMultiplier, Integer numWalkers) throws Exception {
856      int ret =
857        runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
858      if (ret > 0) {
859        return ret;
860      }
861      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
862    }
863
864    public boolean verify() {
865      try {
866        Counters counters = job.getCounters();
867        if (counters == null) {
868          LOG.info("Counters object was null, Generator verification cannot be performed."
869            + " This is commonly a result of insufficient YARN configuration.");
870          return false;
871        }
872
873        if (
874          counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0
875            || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0
876            || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0
877        ) {
878          LOG.error("Concurrent walker failed to verify during Generation phase");
879          LOG.error(
880            "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue());
881          LOG.error(
882            "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue());
883          LOG.error(
884            "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue());
885          return false;
886        }
887      } catch (IOException e) {
888        LOG.info("Generator verification could not find counter");
889        return false;
890      }
891      return true;
892    }
893  }
894
895  private static ColumnFamilyDescriptorBuilder setMobProperties(final Configuration conf,
896    final ColumnFamilyDescriptorBuilder builder) {
897    if (conf.getBoolean("useMob", false)) {
898      builder.setMobEnabled(true);
899      builder.setMobThreshold(4);
900    }
901    return builder;
902  }
903
904  /**
905   * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key
906   * file must have been written by Verify step (we depend on the format it writes out. We'll read
907   * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO).
908   */
909  static class Search extends Configured implements Tool {
910    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
911    protected Job job;
912
913    private static void printUsage(final String error) {
914      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
915      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
916    }
917
918    @Override
919    public int run(String[] args) throws Exception {
920      if (args.length < 1 || args.length > 2) {
921        printUsage(null);
922        return 1;
923      }
924      Path inputDir = new Path(args[0]);
925      int numMappers = 1;
926      if (args.length > 1) {
927        numMappers = Integer.parseInt(args[1]);
928      }
929      return run(inputDir, numMappers);
930    }
931
932    /**
933     * WALPlayer override that searches for keys loaded in the setup.
934     */
935    public static class WALSearcher extends WALPlayer {
936      public WALSearcher(Configuration conf) {
937        super(conf);
938      }
939
940      /**
941       * The actual searcher mapper.
942       */
943      public static class WALMapperSearcher extends WALMapper {
944        private SortedSet<byte[]> keysToFind;
945        private AtomicInteger rows = new AtomicInteger(0);
946
947        @Override
948        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
949          throws IOException {
950          super.setup(context);
951          try {
952            this.keysToFind = readKeysToSearch(context.getConfiguration());
953            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
954          } catch (InterruptedException e) {
955            throw new InterruptedIOException(e.toString());
956          }
957        }
958
959        @Override
960        protected boolean filter(Context context, Cell cell) {
961          // TODO: Can I do a better compare than this copying out key?
962          byte[] row = new byte[cell.getRowLength()];
963          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
964          boolean b = this.keysToFind.contains(row);
965          if (b) {
966            String keyStr = Bytes.toStringBinary(row);
967            try {
968              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
969            } catch (IOException | InterruptedException e) {
970              LOG.warn(e.toString(), e);
971            }
972            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
973              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
974            }
975            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
976          }
977          return b;
978        }
979      }
980
981      // Put in place the above WALMapperSearcher.
982      @Override
983      public Job createSubmittableJob(String[] args) throws IOException {
984        Job job = super.createSubmittableJob(args);
985        // Call my class instead.
986        job.setJarByClass(WALMapperSearcher.class);
987        job.setMapperClass(WALMapperSearcher.class);
988        job.setOutputFormatClass(NullOutputFormat.class);
989        return job;
990      }
991    }
992
993    static final String FOUND_GROUP_KEY = "Found";
994    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
995
996    public int run(Path inputDir, int numMappers) throws Exception {
997      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
998      SortedSet<byte[]> keys = readKeysToSearch(getConf());
999      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
1000      LOG.info("Count of keys to find: " + keys.size());
1001      for (byte[] key : keys)
1002        LOG.info("Key: " + Bytes.toStringBinary(key));
1003      // Now read all WALs. In two dirs. Presumes certain layout.
1004      Path walsDir =
1005        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
1006      Path oldWalsDir =
1007        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
1008      LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers
1009        + " against " + getConf().get(HConstants.HBASE_DIR));
1010      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
1011        new String[] { walsDir.toString(), "" });
1012      if (ret != 0) {
1013        return ret;
1014      }
1015      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
1016        new String[] { oldWalsDir.toString(), "" });
1017    }
1018
1019    static SortedSet<byte[]> readKeysToSearch(final Configuration conf)
1020      throws IOException, InterruptedException {
1021      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1022      FileSystem fs = FileSystem.get(conf);
1023      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1024      if (!fs.exists(keysInputDir)) {
1025        throw new FileNotFoundException(keysInputDir.toString());
1026      }
1027      if (!fs.isDirectory(keysInputDir)) {
1028        throw new UnsupportedOperationException("TODO");
1029      } else {
1030        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1031        while (iterator.hasNext()) {
1032          LocatedFileStatus keyFileStatus = iterator.next();
1033          // Skip "_SUCCESS" file.
1034          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1035          result.addAll(readFileToSearch(conf, keyFileStatus));
1036        }
1037      }
1038      return result;
1039    }
1040
1041    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1042      final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException {
1043      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1044      // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This
1045      // is
1046      // what is missing.
1047      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1048      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1049        new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1050        InputSplit is =
1051          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {});
1052        rr.initialize(is, context);
1053        while (rr.nextKeyValue()) {
1054          rr.getCurrentKey();
1055          BytesWritable bw = rr.getCurrentValue();
1056          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) {
1057            byte[] key = new byte[rr.getCurrentKey().getLength()];
1058            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
1059              rr.getCurrentKey().getLength());
1060            result.add(key);
1061          }
1062        }
1063      }
1064      return result;
1065    }
1066  }
1067
1068  /**
1069   * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have
1070   * any holes.
1071   */
1072  static class Verify extends Configured implements Tool {
1073    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1074    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1075    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1076    protected Job job;
1077
1078    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1079      private BytesWritable row = new BytesWritable();
1080      private BytesWritable ref = new BytesWritable();
1081      private boolean multipleUnevenColumnFamilies;
1082
1083      @Override
1084      protected void
1085        setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1086          throws IOException, InterruptedException {
1087        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1088      }
1089
1090      @Override
1091      protected void map(ImmutableBytesWritable key, Result value, Context context)
1092        throws IOException, InterruptedException {
1093        byte[] rowKey = key.get();
1094        row.set(rowKey, 0, rowKey.length);
1095        if (
1096          multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME)
1097            || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME))
1098        ) {
1099          context.write(row, DEF_LOST_FAMILIES);
1100        } else {
1101          context.write(row, DEF);
1102        }
1103        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1104        if (prev != null && prev.length > 0) {
1105          ref.set(prev, 0, prev.length);
1106          context.write(ref, row);
1107        } else {
1108          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1109        }
1110      }
1111    }
1112
1113    /**
1114     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1115     * problems found from the reducer.
1116     */
1117    public static enum VerifyCounts {
1118      UNREFERENCED,
1119      UNDEFINED,
1120      REFERENCED,
1121      CORRUPT,
1122      EXTRAREFERENCES,
1123      EXTRA_UNDEF_REFERENCES,
1124      LOST_FAMILIES
1125    }
1126
1127    /**
1128     * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent
1129     * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what
1130     * sort of emission it is. Flag is the Count enum ordinal as a short.
1131     */
1132    public static class VerifyReducer
1133      extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1134      private ArrayList<byte[]> refs = new ArrayList<>();
1135      private final BytesWritable UNREF =
1136        new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {}));
1137      private final BytesWritable LOSTFAM =
1138        new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {}));
1139
1140      private AtomicInteger rows = new AtomicInteger(0);
1141      private Connection connection;
1142
1143      @Override
1144      protected void
1145        setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1146          throws IOException, InterruptedException {
1147        super.setup(context);
1148        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1149      }
1150
1151      @Override
1152      protected void
1153        cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1154          throws IOException, InterruptedException {
1155        if (this.connection != null) {
1156          this.connection.close();
1157        }
1158        super.cleanup(context);
1159      }
1160
1161      /**
1162       * Returns new byte array that has <code>ordinal</code> as prefix on front taking up
1163       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1164       */
1165      public static byte[] addPrefixFlag(final int ordinal, final byte[] r) {
1166        byte[] prefix = Bytes.toBytes((short) ordinal);
1167        if (prefix.length != Bytes.SIZEOF_SHORT) {
1168          throw new RuntimeException("Unexpected size: " + prefix.length);
1169        }
1170        byte[] result = new byte[prefix.length + r.length];
1171        System.arraycopy(prefix, 0, result, 0, prefix.length);
1172        System.arraycopy(r, 0, result, prefix.length, r.length);
1173        return result;
1174      }
1175
1176      /**
1177       * Returns type from the Counts enum of this row. Reads prefix added by
1178       * {@link #addPrefixFlag(int, byte[])}
1179       */
1180      public static VerifyCounts whichType(final byte[] bs) {
1181        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1182        return VerifyCounts.values()[ordinal];
1183      }
1184
1185      /** Returns Row bytes minus the type flag. */
1186      public static byte[] getRowOnly(BytesWritable bw) {
1187        byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT];
1188        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1189        return bytes;
1190      }
1191
1192      @Override
1193      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1194        throws IOException, InterruptedException {
1195        int defCount = 0;
1196        boolean lostFamilies = false;
1197        refs.clear();
1198        for (BytesWritable type : values) {
1199          if (type.getLength() == DEF.getLength()) {
1200            defCount++;
1201            if (type.getBytes()[0] == 1) {
1202              lostFamilies = true;
1203            }
1204          } else {
1205            byte[] bytes = new byte[type.getLength()];
1206            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1207            refs.add(bytes);
1208          }
1209        }
1210
1211        // TODO check for more than one def, should not happen
1212        StringBuilder refsSb = null;
1213        if (defCount == 0 || refs.size() != 1) {
1214          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1215          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1216          LOG.error("LinkedListError: key=" + keyString + ", reference(s)="
1217            + (refsSb != null ? refsSb.toString() : ""));
1218        }
1219        if (lostFamilies) {
1220          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1221          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1222          context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1);
1223          context.write(key, LOSTFAM);
1224        }
1225
1226        if (defCount == 0 && refs.size() > 0) {
1227          // This is bad, found a node that is referenced but not defined. It must have been
1228          // lost, emit some info about this node for debugging purposes.
1229          // Write out a line per reference. If more than one, flag it.;
1230          for (int i = 0; i < refs.size(); i++) {
1231            byte[] bs = refs.get(i);
1232            int ordinal;
1233            if (i <= 0) {
1234              ordinal = VerifyCounts.UNDEFINED.ordinal();
1235              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1236              context.getCounter(VerifyCounts.UNDEFINED).increment(1);
1237            } else {
1238              ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal();
1239              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1240            }
1241          }
1242          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1243            // Print out missing row; doing get on reference gives info on when the referencer
1244            // was added which can help a little debugging. This info is only available in mapper
1245            // output -- the 'Linked List error Key...' log message above. What we emit here is
1246            // useless for debugging.
1247            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1248            context.getCounter("undef", keyString).increment(1);
1249          }
1250        } else if (defCount > 0 && refs.isEmpty()) {
1251          // node is defined but not referenced
1252          context.write(key, UNREF);
1253          context.getCounter(VerifyCounts.UNREFERENCED).increment(1);
1254          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1255            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1256            context.getCounter("unref", keyString).increment(1);
1257          }
1258        } else {
1259          if (refs.size() > 1) {
1260            // Skip first reference.
1261            for (int i = 1; i < refs.size(); i++) {
1262              context.write(key, new BytesWritable(
1263                addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1264            }
1265            context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1);
1266          }
1267          // node is defined and referenced
1268          context.getCounter(VerifyCounts.REFERENCED).increment(1);
1269        }
1270      }
1271
1272      /**
1273       * Dump out extra info around references if there are any. Helps debugging.
1274       * @return StringBuilder filled with references if any.
1275       */
1276      @SuppressWarnings("JavaUtilDate")
1277      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1278        final List<byte[]> refs) throws IOException {
1279        StringBuilder refsSb = null;
1280        if (refs.isEmpty()) return refsSb;
1281        refsSb = new StringBuilder();
1282        String comma = "";
1283        // If a row is a reference but has no define, print the content of the row that has
1284        // this row as a 'prev'; it will help debug. The missing row was written just before
1285        // the row we are dumping out here.
1286        TableName tn = getTableName(context.getConfiguration());
1287        try (Table t = this.connection.getTable(tn)) {
1288          for (byte[] ref : refs) {
1289            Result r = t.get(new Get(ref));
1290            List<Cell> cells = r.listCells();
1291            String ts = (cells != null && !cells.isEmpty())
1292              ? new java.util.Date(cells.get(0).getTimestamp()).toString()
1293              : "";
1294            byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1295            String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : "";
1296            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1297            long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1;
1298            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1299            String refRegionLocation = "";
1300            String keyRegionLocation = "";
1301            if (b != null && b.length > 0) {
1302              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1303                HRegionLocation hrl = rl.getRegionLocation(b);
1304                if (hrl != null) refRegionLocation = hrl.toString();
1305                // Key here probably has trailing zeros on it.
1306                hrl = rl.getRegionLocation(key.getBytes());
1307                if (hrl != null) keyRegionLocation = hrl.toString();
1308              }
1309            }
1310            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref)
1311              + ", refPrevEqualsKey="
1312              + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0)
1313              + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength())
1314              + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count
1315              + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation="
1316              + keyRegionLocation);
1317            refsSb.append(comma);
1318            comma = ",";
1319            refsSb.append(Bytes.toStringBinary(ref));
1320          }
1321        }
1322        return refsSb;
1323      }
1324    }
1325
1326    @Override
1327    public int run(String[] args) throws Exception {
1328      if (args.length != 2) {
1329        System.out
1330          .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
1331        return 0;
1332      }
1333
1334      String outputDir = args[0];
1335      int numReducers = Integer.parseInt(args[1]);
1336
1337      return run(outputDir, numReducers);
1338    }
1339
1340    public int run(String outputDir, int numReducers) throws Exception {
1341      return run(new Path(outputDir), numReducers);
1342    }
1343
1344    public int run(Path outputDir, int numReducers) throws Exception {
1345      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
1346
1347      job = Job.getInstance(getConf());
1348
1349      job.setJobName("Link Verifier");
1350      job.setNumReduceTasks(numReducers);
1351      job.setJarByClass(getClass());
1352
1353      setJobScannerConf(job);
1354
1355      Scan scan = new Scan();
1356      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1357      scan.setCaching(10000);
1358      scan.setCacheBlocks(false);
1359      if (isMultiUnevenColumnFamilies(getConf())) {
1360        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1361        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1362      }
1363
1364      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1365        VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1366      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1367        AbstractHBaseTool.class);
1368
1369      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1370
1371      job.setReducerClass(VerifyReducer.class);
1372      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1373      job.setOutputKeyClass(BytesWritable.class);
1374      job.setOutputValueClass(BytesWritable.class);
1375      TextOutputFormat.setOutputPath(job, outputDir);
1376
1377      boolean success = job.waitForCompletion(true);
1378
1379      if (success) {
1380        Counters counters = job.getCounters();
1381        if (null == counters) {
1382          LOG.warn("Counters were null, cannot verify Job completion."
1383            + " This is commonly a result of insufficient YARN configuration.");
1384          // We don't have access to the counters to know if we have "bad" counts
1385          return 0;
1386        }
1387
1388        // If we find no unexpected values, the job didn't outright fail
1389        if (verifyUnexpectedValues(counters)) {
1390          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1391          return 0;
1392        }
1393      }
1394
1395      // We failed
1396      return 1;
1397    }
1398
1399    public boolean verify(long expectedReferenced) throws Exception {
1400      if (job == null) {
1401        throw new IllegalStateException("You should call run() first");
1402      }
1403
1404      Counters counters = job.getCounters();
1405      if (counters == null) {
1406        LOG.info("Counters object was null, write verification cannot be performed."
1407          + " This is commonly a result of insufficient YARN configuration.");
1408        return false;
1409      }
1410
1411      // Run through each check, even if we fail one early
1412      boolean success = verifyExpectedValues(expectedReferenced, counters);
1413
1414      if (!verifyUnexpectedValues(counters)) {
1415        // We found counter objects which imply failure
1416        success = false;
1417      }
1418
1419      if (!success) {
1420        handleFailure(counters);
1421      }
1422      return success;
1423    }
1424
1425    /**
1426     * Verify the values in the Counters against the expected number of entries written. Expected
1427     * number of referenced entrires The Job's Counters object
1428     * @return True if the values match what's expected, false otherwise
1429     */
1430    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1431      final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED);
1432      final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED);
1433      boolean success = true;
1434
1435      if (expectedReferenced != referenced.getValue()) {
1436        LOG.error("Expected referenced count does not match with actual referenced count. "
1437          + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1438        success = false;
1439      }
1440
1441      if (unreferenced.getValue() > 0) {
1442        final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES);
1443        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1444        LOG.error(
1445          "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1446            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1447        success = false;
1448      }
1449
1450      return success;
1451    }
1452
1453    /**
1454     * Verify that the Counters don't contain values which indicate an outright failure from the
1455     * Reducers. The Job's counters
1456     * @return True if the "bad" counter objects are 0, false otherwise
1457     */
1458    protected boolean verifyUnexpectedValues(Counters counters) {
1459      final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED);
1460      final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES);
1461      boolean success = true;
1462
1463      if (undefined.getValue() > 0) {
1464        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1465        success = false;
1466      }
1467
1468      if (lostfamilies.getValue() > 0) {
1469        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1470        success = false;
1471      }
1472
1473      return success;
1474    }
1475
1476    protected void handleFailure(Counters counters) throws IOException {
1477      Configuration conf = job.getConfiguration();
1478      TableName tableName = getTableName(conf);
1479      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1480        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1481          CounterGroup g = counters.getGroup("undef");
1482          Iterator<Counter> it = g.iterator();
1483          while (it.hasNext()) {
1484            String keyString = it.next().getName();
1485            byte[] key = Bytes.toBytes(keyString);
1486            HRegionLocation loc = rl.getRegionLocation(key, true);
1487            LOG.error("undefined row " + keyString + ", " + loc);
1488          }
1489          g = counters.getGroup("unref");
1490          it = g.iterator();
1491          while (it.hasNext()) {
1492            String keyString = it.next().getName();
1493            byte[] key = Bytes.toBytes(keyString);
1494            HRegionLocation loc = rl.getRegionLocation(key, true);
1495            LOG.error("unreferred row " + keyString + ", " + loc);
1496          }
1497        }
1498      }
1499    }
1500  }
1501
1502  /**
1503   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1504   * adds more data.
1505   */
1506  static class Loop extends Configured implements Tool {
1507
1508    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1509    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> "
1510      + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>"
1511      + " <num walker threads>] \n"
1512      + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n"
1513      + "walkers will select and verify random flushed loop during Generation.";
1514
1515    IntegrationTestBigLinkedList it;
1516
1517    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
1518      Integer wrapMultiplier, Integer numWalkers) throws Exception {
1519      Path outputPath = new Path(outputDir);
1520      UUID uuid = UUID.randomUUID(); // create a random UUID.
1521      Path generatorOutput = new Path(outputPath, uuid.toString());
1522
1523      Generator generator = new Generator();
1524      generator.setConf(getConf());
1525      int retCode =
1526        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
1527      if (retCode > 0) {
1528        throw new RuntimeException("Generator failed with return code: " + retCode);
1529      }
1530      if (numWalkers > 0) {
1531        if (!generator.verify()) {
1532          throw new RuntimeException("Generator.verify failed");
1533        }
1534      }
1535      LOG.info("Generator finished with success. Total nodes=" + numNodes);
1536    }
1537
1538    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes)
1539      throws Exception {
1540      Path outputPath = new Path(outputDir);
1541      UUID uuid = UUID.randomUUID(); // create a random UUID.
1542      Path iterationOutput = new Path(outputPath, uuid.toString());
1543
1544      Verify verify = new Verify();
1545      verify.setConf(getConf());
1546      int retCode = verify.run(iterationOutput, numReducers);
1547      if (retCode > 0) {
1548        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1549      }
1550
1551      if (!verify.verify(expectedNumNodes)) {
1552        throw new RuntimeException("Verify.verify failed");
1553      }
1554      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1555    }
1556
1557    @Override
1558    public int run(String[] args) throws Exception {
1559      if (args.length < 5) {
1560        System.err.println(USAGE);
1561        return 1;
1562      }
1563      try {
1564        int numIterations = Integer.parseInt(args[0]);
1565        int numMappers = Integer.parseInt(args[1]);
1566        long numNodes = Long.parseLong(args[2]);
1567        String outputDir = args[3];
1568        int numReducers = Integer.parseInt(args[4]);
1569        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1570        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1571        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1572
1573        long expectedNumNodes = 0;
1574
1575        if (numIterations < 0) {
1576          numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
1577        }
1578        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1579        for (int i = 0; i < numIterations; i++) {
1580          LOG.info("Starting iteration = " + i);
1581          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1582          expectedNumNodes += numMappers * numNodes;
1583          runVerify(outputDir, numReducers, expectedNumNodes);
1584        }
1585        return 0;
1586      } catch (NumberFormatException e) {
1587        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1588        System.err.println(USAGE);
1589        return 1;
1590      }
1591    }
1592  }
1593
1594  /**
1595   * A stand alone program that prints out portions of a list created by {@link Generator}
1596   */
1597  private static class Print extends Configured implements Tool {
1598    @Override
1599    public int run(String[] args) throws Exception {
1600      Options options = new Options();
1601      options.addOption("s", "start", true, "start key");
1602      options.addOption("e", "end", true, "end key");
1603      options.addOption("l", "limit", true, "number to print");
1604
1605      GnuParser parser = new GnuParser();
1606      CommandLine cmd = null;
1607      try {
1608        cmd = parser.parse(options, args);
1609        if (cmd.getArgs().length != 0) {
1610          throw new ParseException("Command takes no arguments");
1611        }
1612      } catch (ParseException e) {
1613        System.err.println("Failed to parse command line " + e.getMessage());
1614        System.err.println();
1615        HelpFormatter formatter = new HelpFormatter();
1616        formatter.printHelp(getClass().getSimpleName(), options);
1617        System.exit(-1);
1618      }
1619
1620      Connection connection = ConnectionFactory.createConnection(getConf());
1621      Table table = connection.getTable(getTableName(getConf()));
1622
1623      Scan scan = new Scan();
1624      scan.setBatch(10000);
1625
1626      if (cmd.hasOption("s")) scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1627
1628      if (cmd.hasOption("e")) {
1629        scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1630      }
1631
1632      int limit = 0;
1633      if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l"));
1634      else limit = 100;
1635
1636      ResultScanner scanner = table.getScanner(scan);
1637
1638      CINode node = new CINode();
1639      Result result = scanner.next();
1640      int count = 0;
1641      while (result != null && count++ < limit) {
1642        node = getCINode(result, node);
1643        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1644          Bytes.toStringBinary(node.prev), node.count, node.client);
1645        result = scanner.next();
1646      }
1647      scanner.close();
1648      table.close();
1649      connection.close();
1650
1651      return 0;
1652    }
1653  }
1654
1655  /**
1656   * A stand alone program that deletes a single node.
1657   */
1658  private static class Delete extends Configured implements Tool {
1659    @Override
1660    public int run(String[] args) throws Exception {
1661      if (args.length != 1) {
1662        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1663        return 0;
1664      }
1665      byte[] val = Bytes.toBytesBinary(args[0]);
1666
1667      org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val);
1668
1669      try (Connection connection = ConnectionFactory.createConnection(getConf());
1670        Table table = connection.getTable(getTableName(getConf()))) {
1671        table.delete(delete);
1672      }
1673
1674      System.out.println("Delete successful");
1675      return 0;
1676    }
1677  }
1678
1679  abstract static class WalkerBase extends Configured {
1680    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1681      Scan scan = new Scan();
1682      scan.withStartRow(startKey);
1683      scan.setBatch(1);
1684      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1685
1686      long t1 = EnvironmentEdgeManager.currentTime();
1687      ResultScanner scanner = table.getScanner(scan);
1688      Result result = scanner.next();
1689      long t2 = EnvironmentEdgeManager.currentTime();
1690      scanner.close();
1691
1692      if (result != null) {
1693        CINode node = getCINode(result, new CINode());
1694        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1695        return node;
1696      }
1697
1698      System.out.println("FSR " + (t2 - t1));
1699
1700      return null;
1701    }
1702
1703    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1704      Get get = new Get(row);
1705      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1706      Result result = table.get(get);
1707      return getCINode(result, node);
1708    }
1709  }
1710
1711  /**
1712   * A stand alone program that follows a linked list created by {@link Generator} and prints timing
1713   * info.
1714   */
1715  private static class Walker extends WalkerBase implements Tool {
1716
1717    public Walker() {
1718    }
1719
1720    @Override
1721    public int run(String[] args) throws IOException {
1722
1723      Options options = new Options();
1724      options.addOption("n", "num", true, "number of queries");
1725      options.addOption("s", "start", true, "key to start at, binary string");
1726      options.addOption("l", "logevery", true, "log every N queries");
1727
1728      GnuParser parser = new GnuParser();
1729      CommandLine cmd = null;
1730      try {
1731        cmd = parser.parse(options, args);
1732        if (cmd.getArgs().length != 0) {
1733          throw new ParseException("Command takes no arguments");
1734        }
1735      } catch (ParseException e) {
1736        System.err.println("Failed to parse command line " + e.getMessage());
1737        System.err.println();
1738        HelpFormatter formatter = new HelpFormatter();
1739        formatter.printHelp(getClass().getSimpleName(), options);
1740        System.exit(-1);
1741      }
1742
1743      long maxQueries = Long.MAX_VALUE;
1744      if (cmd.hasOption('n')) {
1745        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1746      }
1747      boolean isSpecificStart = cmd.hasOption('s');
1748
1749      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1750      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1751
1752      Connection connection = ConnectionFactory.createConnection(getConf());
1753      Table table = connection.getTable(getTableName(getConf()));
1754      long numQueries = 0;
1755      // If isSpecificStart is set, only walk one list from that particular node.
1756      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1757      // the case in normal run without startKey.
1758      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1759        if (!isSpecificStart) {
1760          startKey = new byte[ROWKEY_LENGTH];
1761          Bytes.random(startKey);
1762        }
1763        CINode node = findStartNode(table, startKey);
1764        if (node == null && isSpecificStart) {
1765          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1766        }
1767        numQueries++;
1768        while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1769          byte[] prev = node.prev;
1770          long t1 = EnvironmentEdgeManager.currentTime();
1771          node = getNode(prev, table, node);
1772          long t2 = EnvironmentEdgeManager.currentTime();
1773          if (logEvery > 0 && numQueries % logEvery == 0) {
1774            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1775          }
1776          numQueries++;
1777          if (node == null) {
1778            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1779          } else if (node.prev.length == NO_KEY.length) {
1780            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1781          }
1782        }
1783      }
1784      table.close();
1785      connection.close();
1786      return 0;
1787    }
1788  }
1789
1790  private static class Clean extends Configured implements Tool {
1791    @Override
1792    public int run(String[] args) throws Exception {
1793      if (args.length < 1) {
1794        System.err.println("Usage: Clean <output dir>");
1795        return -1;
1796      }
1797
1798      Path p = new Path(args[0]);
1799      Configuration conf = getConf();
1800      TableName tableName = getTableName(conf);
1801      try (FileSystem fs = HFileSystem.get(conf);
1802        Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
1803        if (admin.tableExists(tableName)) {
1804          admin.disableTable(tableName);
1805          admin.deleteTable(tableName);
1806        }
1807
1808        if (fs.exists(p)) {
1809          fs.delete(p, true);
1810        }
1811      }
1812
1813      return 0;
1814    }
1815  }
1816
1817  static TableName getTableName(Configuration conf) {
1818    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1819  }
1820
1821  private static CINode getCINode(Result result, CINode node) {
1822    node.key = Bytes.copy(result.getRow());
1823    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1824      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1825    } else {
1826      node.prev = NO_KEY;
1827    }
1828    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1829      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1830    } else {
1831      node.count = -1;
1832    }
1833    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1834      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1835    } else {
1836      node.client = "";
1837    }
1838    return node;
1839  }
1840
1841  @Override
1842  public void setUpCluster() throws Exception {
1843    util = getTestingUtil(getConf());
1844    boolean isDistributed = util.isDistributedCluster();
1845    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1846    if (!isDistributed) {
1847      util.startMiniMapReduceCluster();
1848    }
1849    this.setConf(util.getConfiguration());
1850  }
1851
1852  @Override
1853  public void cleanUpCluster() throws Exception {
1854    super.cleanUpCluster();
1855    if (util.isDistributedCluster()) {
1856      util.shutdownMiniMapReduceCluster();
1857    }
1858  }
1859
1860  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1861    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
1862  }
1863
1864  @Test
1865  public void testContinuousIngest() throws IOException, Exception {
1866    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1867    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1868    if (isMultiUnevenColumnFamilies(getConf())) {
1869      // make sure per CF flush is on
1870      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
1871        FlushAllLargeStoresPolicy.class.getName());
1872    }
1873    int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1874      util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1875    org.junit.Assert.assertEquals(0, ret);
1876  }
1877
1878  private void usage() {
1879    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1880    printCommands();
1881  }
1882
1883  private void printCommands() {
1884    System.err.println("Commands:");
1885    System.err.println(" generator  Map only job that generates data.");
1886    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1887    System.err.println("            look at the counts after running. See REFERENCED and");
1888    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1889    System.err.println("            with the Generator.");
1890    System.err.println(" walker     "
1891      + "Standalone program that starts following a linked list & emits timing info.");
1892    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1893    System.err.println(" delete     Standalone program that deletes a single node.");
1894    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1895    System.err.println(" clean      Program to clean all left over detritus.");
1896    System.err.println(" search     Search for missing keys.");
1897    System.err.println("");
1898    System.err.println("General options:");
1899    System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>");
1900    System.err.println(
1901      "    Run using the <tableName> as the tablename.  Defaults to " + DEFAULT_TABLE_NAME);
1902    System.err.println(" -D" + REGIONS_PER_SERVER_KEY + "=<# regions>");
1903    System.err.println("    Create table with presplit regions per server.  Defaults to "
1904      + DEFAULT_REGIONS_PER_SERVER);
1905
1906    System.err.println(" -DuseMob=<true|false>");
1907    System.err.println(
1908      "    Create table so that the mob read/write path is forced.  " + "Defaults to false");
1909
1910    System.err.flush();
1911  }
1912
1913  @Override
1914  protected void processOptions(CommandLine cmd) {
1915    super.processOptions(cmd);
1916    String[] args = cmd.getArgs();
1917    // get the class, run with the conf
1918    if (args.length < 1) {
1919      printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]",
1920        "General options:", "");
1921      printCommands();
1922      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1923      throw new RuntimeException("Incorrect Number of args.");
1924    }
1925    toRun = args[0];
1926    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1927  }
1928
1929  @Override
1930  public int runTestFromCommandLine() throws Exception {
1931    Tool tool = null;
1932    if (toRun.equalsIgnoreCase("Generator")) {
1933      tool = new Generator();
1934    } else if (toRun.equalsIgnoreCase("Verify")) {
1935      tool = new Verify();
1936    } else if (toRun.equalsIgnoreCase("Loop")) {
1937      Loop loop = new Loop();
1938      loop.it = this;
1939      tool = loop;
1940    } else if (toRun.equalsIgnoreCase("Walker")) {
1941      tool = new Walker();
1942    } else if (toRun.equalsIgnoreCase("Print")) {
1943      tool = new Print();
1944    } else if (toRun.equalsIgnoreCase("Delete")) {
1945      tool = new Delete();
1946    } else if (toRun.equalsIgnoreCase("Clean")) {
1947      tool = new Clean();
1948    } else if (toRun.equalsIgnoreCase("Search")) {
1949      tool = new Search();
1950    } else {
1951      usage();
1952      throw new RuntimeException("Unknown arg");
1953    }
1954
1955    return ToolRunner.run(getConf(), tool, otherArgs);
1956  }
1957
1958  @Override
1959  public TableName getTablename() {
1960    Configuration c = getConf();
1961    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1962  }
1963
1964  @Override
1965  protected Set<String> getColumnFamilies() {
1966    if (isMultiUnevenColumnFamilies(getConf())) {
1967      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1968        Bytes.toString(TINY_FAMILY_NAME));
1969    } else {
1970      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1971    }
1972  }
1973
1974  private static void setJobConf(Job job, int numMappers, long numNodes, Integer width,
1975    Integer wrapMultiplier, Integer numWalkers) {
1976    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1977    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1978    if (width != null) {
1979      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1980    }
1981    if (wrapMultiplier != null) {
1982      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1983    }
1984    if (numWalkers != null) {
1985      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1986    }
1987  }
1988
1989  public static void setJobScannerConf(Job job) {
1990    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1991  }
1992
1993  public static void main(String[] args) throws Exception {
1994    Configuration conf = HBaseConfiguration.create();
1995    IntegrationTestingUtility.setUseDistributedCluster(conf);
1996    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1997    System.exit(ret);
1998  }
1999}