001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Random;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HDFSBlocksDistribution;
057import org.apache.hadoop.hbase.HTableDescriptor;
058import org.apache.hadoop.hbase.HadoopShims;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.PerformanceEvaluation;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.Tag;
063import org.apache.hadoop.hbase.TagType;
064import org.apache.hadoop.hbase.TagUtil;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.Connection;
067import org.apache.hadoop.hbase.client.ConnectionFactory;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionLocator;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.ResultScanner;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.client.Table;
074import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
075import org.apache.hadoop.hbase.io.compress.Compression;
076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
078import org.apache.hadoop.hbase.io.hfile.CacheConfig;
079import org.apache.hadoop.hbase.io.hfile.HFile;
080import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.regionserver.BloomType;
083import org.apache.hadoop.hbase.regionserver.HRegion;
084import org.apache.hadoop.hbase.regionserver.HStore;
085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
087import org.apache.hadoop.hbase.testclassification.LargeTests;
088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.CommonFSUtils;
092import org.apache.hadoop.hbase.util.FSUtils;
093import org.apache.hadoop.hbase.util.ReflectionUtils;
094import org.apache.hadoop.hdfs.DistributedFileSystem;
095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
098import org.apache.hadoop.io.NullWritable;
099import org.apache.hadoop.mapreduce.Job;
100import org.apache.hadoop.mapreduce.Mapper;
101import org.apache.hadoop.mapreduce.RecordWriter;
102import org.apache.hadoop.mapreduce.TaskAttemptContext;
103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
104import org.junit.ClassRule;
105import org.junit.Ignore;
106import org.junit.Test;
107import org.junit.experimental.categories.Category;
108import org.mockito.Mockito;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112/**
113 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
114 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
115 * values like those of {@link PerformanceEvaluation}.
116 */
117@Category({ VerySlowMapReduceTests.class, LargeTests.class })
118public class TestCellBasedHFileOutputFormat2 {
119
120  @ClassRule
121  public static final HBaseClassTestRule CLASS_RULE =
122    HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class);
123
124  private final static int ROWSPERSPLIT = 1024;
125
126  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
127  private static final byte[][] FAMILIES =
128    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
129  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
130    .map(TableName::valueOf).toArray(TableName[]::new);
131
132  private HBaseTestingUtility util = new HBaseTestingUtility();
133
134  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class);
135
136  /**
137   * Simple mapper that makes KeyValue output.
138   */
139  static class RandomKVGeneratingMapper
140    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
141
142    private int keyLength;
143    private static final int KEYLEN_DEFAULT = 10;
144    private static final String KEYLEN_CONF = "randomkv.key.length";
145
146    private int valLength;
147    private static final int VALLEN_DEFAULT = 10;
148    private static final String VALLEN_CONF = "randomkv.val.length";
149    private static final byte[] QUALIFIER = Bytes.toBytes("data");
150    private boolean multiTableMapper = false;
151    private TableName[] tables = null;
152
153    @Override
154    protected void setup(Context context) throws IOException, InterruptedException {
155      super.setup(context);
156
157      Configuration conf = context.getConfiguration();
158      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
159      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
160      multiTableMapper =
161        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
162      if (multiTableMapper) {
163        tables = TABLE_NAMES;
164      } else {
165        tables = new TableName[] { TABLE_NAMES[0] };
166      }
167    }
168
169    @Override
170    protected void map(NullWritable n1, NullWritable n2,
171      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
172      throws java.io.IOException, InterruptedException {
173
174      byte keyBytes[] = new byte[keyLength];
175      byte valBytes[] = new byte[valLength];
176
177      int taskId = context.getTaskAttemptID().getTaskID().getId();
178      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
179      Random random = new Random();
180      byte[] key;
181      for (int j = 0; j < tables.length; ++j) {
182        for (int i = 0; i < ROWSPERSPLIT; i++) {
183          random.nextBytes(keyBytes);
184          // Ensure that unique tasks generate unique keys
185          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
186          random.nextBytes(valBytes);
187          key = keyBytes;
188          if (multiTableMapper) {
189            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
190          }
191
192          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
193            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
194            context.write(new ImmutableBytesWritable(key), kv);
195          }
196        }
197      }
198    }
199  }
200
201  /**
202   * Simple mapper that makes Put output.
203   */
204  static class RandomPutGeneratingMapper
205    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
206
207    private int keyLength;
208    private static final int KEYLEN_DEFAULT = 10;
209    private static final String KEYLEN_CONF = "randomkv.key.length";
210
211    private int valLength;
212    private static final int VALLEN_DEFAULT = 10;
213    private static final String VALLEN_CONF = "randomkv.val.length";
214    private static final byte[] QUALIFIER = Bytes.toBytes("data");
215    private boolean multiTableMapper = false;
216    private TableName[] tables = null;
217
218    @Override
219    protected void setup(Context context) throws IOException, InterruptedException {
220      super.setup(context);
221
222      Configuration conf = context.getConfiguration();
223      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
224      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
225      multiTableMapper =
226        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
227      if (multiTableMapper) {
228        tables = TABLE_NAMES;
229      } else {
230        tables = new TableName[] { TABLE_NAMES[0] };
231      }
232    }
233
234    @Override
235    protected void map(NullWritable n1, NullWritable n2,
236      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
237      throws java.io.IOException, InterruptedException {
238
239      byte keyBytes[] = new byte[keyLength];
240      byte valBytes[] = new byte[valLength];
241
242      int taskId = context.getTaskAttemptID().getTaskID().getId();
243      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
244
245      Random random = new Random();
246      byte[] key;
247      for (int j = 0; j < tables.length; ++j) {
248        for (int i = 0; i < ROWSPERSPLIT; i++) {
249          random.nextBytes(keyBytes);
250          // Ensure that unique tasks generate unique keys
251          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
252          random.nextBytes(valBytes);
253          key = keyBytes;
254          if (multiTableMapper) {
255            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
256          }
257
258          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
259            Put p = new Put(keyBytes);
260            p.addColumn(family, QUALIFIER, valBytes);
261            // set TTL to very low so that the scan does not return any value
262            p.setTTL(1l);
263            context.write(new ImmutableBytesWritable(key), p);
264          }
265        }
266      }
267    }
268  }
269
270  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
271    if (putSortReducer) {
272      job.setInputFormatClass(NMapInputFormat.class);
273      job.setMapperClass(RandomPutGeneratingMapper.class);
274      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
275      job.setMapOutputValueClass(Put.class);
276    } else {
277      job.setInputFormatClass(NMapInputFormat.class);
278      job.setMapperClass(RandomKVGeneratingMapper.class);
279      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
280      job.setMapOutputValueClass(KeyValue.class);
281    }
282  }
283
284  /**
285   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
286   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
287   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
288   */
289  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
290  @Test
291  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
292    Configuration conf = new Configuration(this.util.getConfiguration());
293    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
294    TaskAttemptContext context = null;
295    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
296    try {
297      Job job = new Job(conf);
298      FileOutputFormat.setOutputPath(job, dir);
299      context = createTestTaskAttemptContext(job);
300      HFileOutputFormat2 hof = new HFileOutputFormat2();
301      writer = hof.getRecordWriter(context);
302      final byte[] b = Bytes.toBytes("b");
303
304      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
305      // changed by call to write. Check all in kv is same but ts.
306      KeyValue kv = new KeyValue(b, b, b);
307      KeyValue original = kv.clone();
308      writer.write(new ImmutableBytesWritable(), kv);
309      assertFalse(original.equals(kv));
310      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
311      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
312      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
313      assertNotSame(original.getTimestamp(), kv.getTimestamp());
314      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
315
316      // Test 2. Now test passing a kv that has explicit ts. It should not be
317      // changed by call to record write.
318      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
319      original = kv.clone();
320      writer.write(new ImmutableBytesWritable(), kv);
321      assertTrue(original.equals(kv));
322    } finally {
323      if (writer != null && context != null) writer.close(context);
324      dir.getFileSystem(conf).delete(dir, true);
325    }
326  }
327
328  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
329    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
330    TaskAttemptContext context =
331      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
332    return context;
333  }
334
335  /*
336   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
337   * time-restricted scans.
338   */
339  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
340  @Test
341  public void test_TIMERANGE() throws Exception {
342    Configuration conf = new Configuration(this.util.getConfiguration());
343    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
344    TaskAttemptContext context = null;
345    Path dir = util.getDataTestDir("test_TIMERANGE_present");
346    LOG.info("Timerange dir writing to dir: " + dir);
347    try {
348      // build a record writer using HFileOutputFormat2
349      Job job = new Job(conf);
350      FileOutputFormat.setOutputPath(job, dir);
351      context = createTestTaskAttemptContext(job);
352      HFileOutputFormat2 hof = new HFileOutputFormat2();
353      writer = hof.getRecordWriter(context);
354
355      // Pass two key values with explicit times stamps
356      final byte[] b = Bytes.toBytes("b");
357
358      // value 1 with timestamp 2000
359      KeyValue kv = new KeyValue(b, b, b, 2000, b);
360      KeyValue original = kv.clone();
361      writer.write(new ImmutableBytesWritable(), kv);
362      assertEquals(original, kv);
363
364      // value 2 with timestamp 1000
365      kv = new KeyValue(b, b, b, 1000, b);
366      original = kv.clone();
367      writer.write(new ImmutableBytesWritable(), kv);
368      assertEquals(original, kv);
369
370      // verify that the file has the proper FileInfo.
371      writer.close(context);
372
373      // the generated file lives 1 directory down from the attempt directory
374      // and is the only file, e.g.
375      // _attempt__0000_r_000000_0/b/1979617994050536795
376      FileSystem fs = FileSystem.get(conf);
377      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
378      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
379      FileStatus[] file = fs.listStatus(sub1[0].getPath());
380
381      // open as HFile Reader and pull out TIMERANGE FileInfo.
382      HFile.Reader rd =
383        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
384      Map<byte[], byte[]> finfo = rd.getHFileInfo();
385      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
386      assertNotNull(range);
387
388      // unmarshall and check values.
389      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
390      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
391      assertEquals(1000, timeRangeTracker.getMin());
392      assertEquals(2000, timeRangeTracker.getMax());
393      rd.close();
394    } finally {
395      if (writer != null && context != null) writer.close(context);
396      dir.getFileSystem(conf).delete(dir, true);
397    }
398  }
399
400  /**
401   * Run small MR job.
402   */
403  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
404  @Test
405  public void testWritingPEData() throws Exception {
406    Configuration conf = util.getConfiguration();
407    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
408    FileSystem fs = testDir.getFileSystem(conf);
409
410    // Set down this value or we OOME in eclipse.
411    conf.setInt("mapreduce.task.io.sort.mb", 20);
412    // Write a few files.
413    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
414
415    Job job = new Job(conf, "testWritingPEData");
416    setupRandomGeneratorMapper(job, false);
417    // This partitioner doesn't work well for number keys but using it anyways
418    // just to demonstrate how to configure it.
419    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
420    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
421
422    Arrays.fill(startKey, (byte) 0);
423    Arrays.fill(endKey, (byte) 0xff);
424
425    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
426    // Set start and end rows for partitioner.
427    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
428    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
429    job.setReducerClass(CellSortReducer.class);
430    job.setOutputFormatClass(HFileOutputFormat2.class);
431    job.setNumReduceTasks(4);
432    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
433      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
434      CellSerialization.class.getName());
435
436    FileOutputFormat.setOutputPath(job, testDir);
437    assertTrue(job.waitForCompletion(false));
438    FileStatus[] files = fs.listStatus(testDir);
439    assertTrue(files.length > 0);
440  }
441
442  /**
443   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
444   */
445  @Test
446  public void test_WritingTagData() throws Exception {
447    Configuration conf = new Configuration(this.util.getConfiguration());
448    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
449    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
450    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
451    TaskAttemptContext context = null;
452    Path dir = util.getDataTestDir("WritingTagData");
453    try {
454      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
455      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
456      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
457      Job job = new Job(conf);
458      FileOutputFormat.setOutputPath(job, dir);
459      context = createTestTaskAttemptContext(job);
460      HFileOutputFormat2 hof = new HFileOutputFormat2();
461      writer = hof.getRecordWriter(context);
462      final byte[] b = Bytes.toBytes("b");
463
464      List<Tag> tags = new ArrayList<>();
465      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
466      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
467      writer.write(new ImmutableBytesWritable(), kv);
468      writer.close(context);
469      writer = null;
470      FileSystem fs = dir.getFileSystem(conf);
471      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
472      while (iterator.hasNext()) {
473        LocatedFileStatus keyFileStatus = iterator.next();
474        HFile.Reader reader =
475          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
476        HFileScanner scanner = reader.getScanner(conf, false, false, false);
477        scanner.seekTo();
478        Cell cell = scanner.getCell();
479        List<Tag> tagsFromCell =
480          TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
481        assertTrue(tagsFromCell.size() > 0);
482        for (Tag tag : tagsFromCell) {
483          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
484        }
485      }
486    } finally {
487      if (writer != null && context != null) writer.close(context);
488      dir.getFileSystem(conf).delete(dir, true);
489    }
490  }
491
492  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
493  @Test
494  public void testJobConfiguration() throws Exception {
495    Configuration conf = new Configuration(this.util.getConfiguration());
496    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
497      util.getDataTestDir("testJobConfiguration").toString());
498    Job job = new Job(conf);
499    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
500    Table table = Mockito.mock(Table.class);
501    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
502    setupMockStartKeys(regionLocator);
503    setupMockTableName(regionLocator);
504    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
505    assertEquals(job.getNumReduceTasks(), 4);
506  }
507
508  private byte[][] generateRandomStartKeys(int numKeys) {
509    Random random = new Random();
510    byte[][] ret = new byte[numKeys][];
511    // first region start key is always empty
512    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
513    for (int i = 1; i < numKeys; i++) {
514      ret[i] =
515        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
516    }
517    return ret;
518  }
519
520  private byte[][] generateRandomSplitKeys(int numKeys) {
521    Random random = new Random();
522    byte[][] ret = new byte[numKeys][];
523    for (int i = 0; i < numKeys; i++) {
524      ret[i] =
525        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
526    }
527    return ret;
528  }
529
530  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
531  @Test
532  public void testMRIncrementalLoad() throws Exception {
533    LOG.info("\nStarting test testMRIncrementalLoad\n");
534    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
535  }
536
537  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
538  @Test
539  public void testMRIncrementalLoadWithSplit() throws Exception {
540    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
541    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
542  }
543
544  /**
545   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
546   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
547   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
548   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
549   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
550   * locality features more easily.
551   */
552  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
553  @Test
554  public void testMRIncrementalLoadWithLocality() throws Exception {
555    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
556    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
557    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
558  }
559
560  // @Ignore("Wahtevs")
561  @Test
562  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
563    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
564    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
565  }
566
567  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
568    boolean putSortReducer, String tableStr) throws Exception {
569    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false,
570      Arrays.asList(tableStr));
571  }
572
573  @Test
574  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
575    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
576    doIncrementalLoadTest(false, false, true, false,
577      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
578  }
579
580  @Test
581  public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception {
582    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n");
583    doIncrementalLoadTest(false, false, true, true,
584      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
585  }
586
587  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
588    boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr)
589    throws Exception {
590    util = new HBaseTestingUtility();
591    Configuration conf = util.getConfiguration();
592    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
593    if (shouldWriteToTableWithNamespace) {
594      conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
595    }
596    int hostCount = 1;
597    int regionNum = 5;
598    if (shouldKeepLocality) {
599      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
600      // explicit hostnames parameter just like MiniDFSCluster does.
601      hostCount = 3;
602      regionNum = 20;
603    }
604
605    String[] hostnames = new String[hostCount];
606    for (int i = 0; i < hostCount; ++i) {
607      hostnames[i] = "datanode_" + i;
608    }
609    util.startMiniCluster(1, hostCount, hostnames);
610
611    Map<String, Table> allTables = new HashMap<>(tableStr.size());
612    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
613    boolean writeMultipleTables = tableStr.size() > 1;
614    for (String tableStrSingle : tableStr) {
615      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
616      TableName tableName = TableName.valueOf(tableStrSingle);
617      Table table = util.createTable(tableName, FAMILIES, splitKeys);
618
619      RegionLocator r = util.getConnection().getRegionLocator(tableName);
620      assertEquals("Should start with empty table", 0, util.countRows(table));
621      int numRegions = r.getStartKeys().length;
622      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
623
624      allTables.put(tableStrSingle, table);
625      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
626    }
627    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
628    // Generate the bulk load files
629    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
630    if (shouldWriteToTableWithNamespace) {
631      testDir = new Path(testDir, "default");
632    }
633
634    for (Table tableSingle : allTables.values()) {
635      // This doesn't write into the table, just makes files
636      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
637    }
638    int numTableDirs = 0;
639    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
640    for (FileStatus tf : fss) {
641      Path tablePath = testDir;
642
643      if (writeMultipleTables) {
644        if (allTables.containsKey(tf.getPath().getName())) {
645          ++numTableDirs;
646          tablePath = tf.getPath();
647        } else {
648          continue;
649        }
650      }
651
652      // Make sure that a directory was created for every CF
653      int dir = 0;
654      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
655      for (FileStatus f : fss) {
656        for (byte[] family : FAMILIES) {
657          if (Bytes.toString(family).equals(f.getPath().getName())) {
658            ++dir;
659          }
660        }
661      }
662      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
663    }
664    if (writeMultipleTables) {
665      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
666    }
667
668    Admin admin = util.getConnection().getAdmin();
669    try {
670      // handle the split case
671      if (shouldChangeRegions) {
672        Table chosenTable = allTables.values().iterator().next();
673        // Choose a semi-random table if multiple tables are available
674        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
675        admin.disableTable(chosenTable.getName());
676        util.waitUntilNoRegionsInTransition();
677
678        util.deleteTable(chosenTable.getName());
679        byte[][] newSplitKeys = generateRandomSplitKeys(14);
680        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
681
682        while (
683          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
684            .size() != 15 || !admin.isTableAvailable(table.getName())
685        ) {
686          Thread.sleep(200);
687          LOG.info("Waiting for new region assignment to happen");
688        }
689      }
690
691      // Perform the actual load
692      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
693        Path tableDir = testDir;
694        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
695        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
696        if (writeMultipleTables) {
697          tableDir = new Path(testDir, tableNameStr);
698        }
699        Table currentTable = allTables.get(tableNameStr);
700        TableName currentTableName = currentTable.getName();
701        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable,
702          singleTableInfo.getRegionLocator());
703
704        // Ensure data shows up
705        int expectedRows = 0;
706        if (putSortReducer) {
707          // no rows should be extracted
708          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
709            util.countRows(currentTable));
710        } else {
711          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
712          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
713            util.countRows(currentTable));
714          Scan scan = new Scan();
715          ResultScanner results = currentTable.getScanner(scan);
716          for (Result res : results) {
717            assertEquals(FAMILIES.length, res.rawCells().length);
718            Cell first = res.rawCells()[0];
719            for (Cell kv : res.rawCells()) {
720              assertTrue(CellUtil.matchingRows(first, kv));
721              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
722            }
723          }
724          results.close();
725        }
726        String tableDigestBefore = util.checksumRows(currentTable);
727        // Check region locality
728        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
729        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
730          hbd.add(region.getHDFSBlocksDistribution());
731        }
732        for (String hostname : hostnames) {
733          float locality = hbd.getBlockLocalityIndex(hostname);
734          LOG.info("locality of [" + hostname + "]: " + locality);
735          assertEquals(100, (int) (locality * 100));
736        }
737
738        // Cause regions to reopen
739        admin.disableTable(currentTableName);
740        while (!admin.isTableDisabled(currentTableName)) {
741          Thread.sleep(200);
742          LOG.info("Waiting for table to disable");
743        }
744        admin.enableTable(currentTableName);
745        util.waitTableAvailable(currentTableName);
746        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
747          util.checksumRows(currentTable));
748      }
749    } finally {
750      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
751        tableInfoSingle.getRegionLocator().close();
752      }
753      for (Entry<String, Table> singleTable : allTables.entrySet()) {
754        singleTable.getValue().close();
755        util.deleteTable(singleTable.getValue().getName());
756      }
757      testDir.getFileSystem(conf).delete(testDir, true);
758      util.shutdownMiniCluster();
759    }
760  }
761
762  private void runIncrementalPELoad(Configuration conf,
763    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
764    throws IOException, InterruptedException, ClassNotFoundException {
765    Job job = new Job(conf, "testLocalMRIncrementalLoad");
766    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
767    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
768      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
769      CellSerialization.class.getName());
770    setupRandomGeneratorMapper(job, putSortReducer);
771    if (tableInfo.size() > 1) {
772      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
773      int sum = 0;
774      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
775        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
776      }
777      assertEquals(sum, job.getNumReduceTasks());
778    } else {
779      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
780      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
781        regionLocator);
782      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
783    }
784
785    FileOutputFormat.setOutputPath(job, outDir);
786
787    assertFalse(util.getTestFileSystem().exists(outDir));
788
789    assertTrue(job.waitForCompletion(true));
790  }
791
792  /**
793   * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and
794   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
795   * compression map is correctly serialized into and deserialized from configuration
796   */
797  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
798  @Test
799  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
800    for (int numCfs = 0; numCfs <= 3; numCfs++) {
801      Configuration conf = new Configuration(this.util.getConfiguration());
802      Map<String, Compression.Algorithm> familyToCompression =
803        getMockColumnFamiliesForCompression(numCfs);
804      Table table = Mockito.mock(Table.class);
805      setupMockColumnFamiliesForCompression(table, familyToCompression);
806      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
807        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
808          Arrays.asList(table.getTableDescriptor())));
809
810      // read back family specific compression setting from the configuration
811      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
812        HFileOutputFormat2.createFamilyCompressionMap(conf);
813
814      // test that we have a value for all column families that matches with the
815      // used mock values
816      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
817        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
818          entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
819      }
820    }
821  }
822
823  private void setupMockColumnFamiliesForCompression(Table table,
824    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
825    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
826    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
827      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
828        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
829    }
830    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
831  }
832
833  /**
834   * @return a map from column family names to compression algorithms for testing column family
835   *         compression. Column family names have special characters
836   */
837  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
838    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
839    // use column family names having special characters
840    if (numCfs-- > 0) {
841      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
842    }
843    if (numCfs-- > 0) {
844      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
845    }
846    if (numCfs-- > 0) {
847      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
848    }
849    if (numCfs-- > 0) {
850      familyToCompression.put("Family3", Compression.Algorithm.NONE);
851    }
852    return familyToCompression;
853  }
854
855  /**
856   * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and
857   * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the compression
858   * map is correctly serialized into and deserialized from configuration
859   */
860  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
861  @Test
862  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
863    for (int numCfs = 0; numCfs <= 2; numCfs++) {
864      Configuration conf = new Configuration(this.util.getConfiguration());
865      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
866      Table table = Mockito.mock(Table.class);
867      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
868      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
869        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
870          Arrays.asList(table.getTableDescriptor())));
871
872      // read back family specific data block encoding settings from the
873      // configuration
874      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
875        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
876
877      // test that we have a value for all column families that matches with the
878      // used mock values
879      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
880        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
881          entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
882      }
883    }
884  }
885
886  private void setupMockColumnFamiliesForBloomType(Table table,
887    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
888    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
889    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
890      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
891        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
892    }
893    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
894  }
895
896  /**
897   * @return a map from column family names to compression algorithms for testing column family
898   *         compression. Column family names have special characters
899   */
900  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
901    Map<String, BloomType> familyToBloomType = new HashMap<>();
902    // use column family names having special characters
903    if (numCfs-- > 0) {
904      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
905    }
906    if (numCfs-- > 0) {
907      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
908    }
909    if (numCfs-- > 0) {
910      familyToBloomType.put("Family3", BloomType.NONE);
911    }
912    return familyToBloomType;
913  }
914
915  /**
916   * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and
917   * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the compression
918   * map is correctly serialized into and deserialized from configuration
919   */
920  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
921  @Test
922  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
923    for (int numCfs = 0; numCfs <= 3; numCfs++) {
924      Configuration conf = new Configuration(this.util.getConfiguration());
925      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
926      Table table = Mockito.mock(Table.class);
927      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
928      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
929        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
930          Arrays.asList(table.getTableDescriptor())));
931
932      // read back family specific data block encoding settings from the
933      // configuration
934      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
935        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
936
937      // test that we have a value for all column families that matches with the
938      // used mock values
939      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
940        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
941          entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
942      }
943    }
944  }
945
946  private void setupMockColumnFamiliesForBlockSize(Table table,
947    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
948    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
949    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
950      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
951        .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
952    }
953    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
954  }
955
956  /**
957   * @return a map from column family names to compression algorithms for testing column family
958   *         compression. Column family names have special characters
959   */
960  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
961    Map<String, Integer> familyToBlockSize = new HashMap<>();
962    // use column family names having special characters
963    if (numCfs-- > 0) {
964      familyToBlockSize.put("Family1!@#!@#&", 1234);
965    }
966    if (numCfs-- > 0) {
967      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
968    }
969    if (numCfs-- > 0) {
970      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
971    }
972    if (numCfs-- > 0) {
973      familyToBlockSize.put("Family3", 0);
974    }
975    return familyToBlockSize;
976  }
977
978  /**
979   * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
980   * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that the
981   * compression map is correctly serialized into and deserialized from configuration
982   */
983  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
984  @Test
985  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
986    for (int numCfs = 0; numCfs <= 3; numCfs++) {
987      Configuration conf = new Configuration(this.util.getConfiguration());
988      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
989        getMockColumnFamiliesForDataBlockEncoding(numCfs);
990      Table table = Mockito.mock(Table.class);
991      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
992      HTableDescriptor tableDescriptor = table.getTableDescriptor();
993      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
994        HFileOutputFormat2.serializeColumnFamilyAttribute(
995          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
996
997      // read back family specific data block encoding settings from the
998      // configuration
999      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1000        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1001
1002      // test that we have a value for all column families that matches with the
1003      // used mock values
1004      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1005        assertEquals(
1006          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1007          entry.getValue(),
1008          retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1009      }
1010    }
1011  }
1012
1013  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1014    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1015    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1016    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1017      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
1018        .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
1019    }
1020    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1021  }
1022
1023  /**
1024   * @return a map from column family names to compression algorithms for testing column family
1025   *         compression. Column family names have special characters
1026   */
1027  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1028    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1029    // use column family names having special characters
1030    if (numCfs-- > 0) {
1031      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1032    }
1033    if (numCfs-- > 0) {
1034      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1035    }
1036    if (numCfs-- > 0) {
1037      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1038    }
1039    if (numCfs-- > 0) {
1040      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1041    }
1042    return familyToDataBlockEncoding;
1043  }
1044
1045  private void setupMockStartKeys(RegionLocator table) throws IOException {
1046    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1047      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1048    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1049  }
1050
1051  private void setupMockTableName(RegionLocator table) throws IOException {
1052    TableName mockTableName = TableName.valueOf("mock_table");
1053    Mockito.doReturn(mockTableName).when(table).getName();
1054  }
1055
1056  /**
1057   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1058   * from the column family descriptor
1059   */
1060  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1061  @Test
1062  public void testColumnFamilySettings() throws Exception {
1063    Configuration conf = new Configuration(this.util.getConfiguration());
1064    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1065    TaskAttemptContext context = null;
1066    Path dir = util.getDataTestDir("testColumnFamilySettings");
1067
1068    // Setup table descriptor
1069    Table table = Mockito.mock(Table.class);
1070    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1071    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1072    Mockito.doReturn(htd).when(table).getTableDescriptor();
1073    for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) {
1074      htd.addFamily(hcd);
1075    }
1076
1077    // set up the table to return some mock keys
1078    setupMockStartKeys(regionLocator);
1079
1080    try {
1081      // partial map red setup to get an operational writer for testing
1082      // We turn off the sequence file compression, because DefaultCodec
1083      // pollutes the GZip codec pool with an incompatible compressor.
1084      conf.set("io.seqfile.compression.type", "NONE");
1085      conf.set("hbase.fs.tmp.dir", dir.toString());
1086      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1087      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1088
1089      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1090      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1091      setupRandomGeneratorMapper(job, false);
1092      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1093      FileOutputFormat.setOutputPath(job, dir);
1094      context = createTestTaskAttemptContext(job);
1095      HFileOutputFormat2 hof = new HFileOutputFormat2();
1096      writer = hof.getRecordWriter(context);
1097
1098      // write out random rows
1099      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1100      writer.close(context);
1101
1102      // Make sure that a directory was created for every CF
1103      FileSystem fs = dir.getFileSystem(conf);
1104
1105      // commit so that the filesystem has one directory per column family
1106      hof.getOutputCommitter(context).commitTask(context);
1107      hof.getOutputCommitter(context).commitJob(context);
1108      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1109      assertEquals(htd.getFamilies().size(), families.length);
1110      for (FileStatus f : families) {
1111        String familyStr = f.getPath().getName();
1112        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1113        // verify that the compression on this file matches the configured
1114        // compression
1115        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1116        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1117        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1118
1119        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1120        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1121        assertEquals(
1122          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1123          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1124        assertEquals(
1125          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1126          hcd.getCompressionType(), reader.getFileContext().getCompression());
1127      }
1128    } finally {
1129      dir.getFileSystem(conf).delete(dir, true);
1130    }
1131  }
1132
1133  /**
1134   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1135   * family descriptors
1136   */
1137  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1138    TaskAttemptContext context, Set<byte[]> families, int numRows)
1139    throws IOException, InterruptedException {
1140    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1141    int valLength = 10;
1142    byte valBytes[] = new byte[valLength];
1143
1144    int taskId = context.getTaskAttemptID().getTaskID().getId();
1145    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1146    final byte[] qualifier = Bytes.toBytes("data");
1147    Random random = new Random();
1148    for (int i = 0; i < numRows; i++) {
1149
1150      Bytes.putInt(keyBytes, 0, i);
1151      random.nextBytes(valBytes);
1152      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1153
1154      for (byte[] family : families) {
1155        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1156        writer.write(key, kv);
1157      }
1158    }
1159  }
1160
1161  /**
1162   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1163   * excluded from minor compaction. Without the fix of HBASE-6901, an
1164   * ArrayIndexOutOfBoundsException will be thrown.
1165   */
1166  @Ignore("Flakey: See HBASE-9051")
1167  @Test
1168  public void testExcludeAllFromMinorCompaction() throws Exception {
1169    Configuration conf = util.getConfiguration();
1170    conf.setInt("hbase.hstore.compaction.min", 2);
1171    generateRandomStartKeys(5);
1172
1173    util.startMiniCluster();
1174    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1175      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1176      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1177      final FileSystem fs = util.getDFSCluster().getFileSystem();
1178      assertEquals("Should start with empty table", 0, util.countRows(table));
1179
1180      // deep inspection: get the StoreFile dir
1181      final Path storePath =
1182        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1183          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1184            Bytes.toString(FAMILIES[0])));
1185      assertEquals(0, fs.listStatus(storePath).length);
1186
1187      // Generate two bulk load files
1188      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1189
1190      for (int i = 0; i < 2; i++) {
1191        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1192        runIncrementalPELoad(conf,
1193          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(),
1194            conn.getRegionLocator(TABLE_NAMES[0]))),
1195          testDir, false);
1196        // Perform the actual load
1197        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1198      }
1199
1200      // Ensure data shows up
1201      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1202      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
1203        util.countRows(table));
1204
1205      // should have a second StoreFile now
1206      assertEquals(2, fs.listStatus(storePath).length);
1207
1208      // minor compactions shouldn't get rid of the file
1209      admin.compact(TABLE_NAMES[0]);
1210      try {
1211        quickPoll(new Callable<Boolean>() {
1212          @Override
1213          public Boolean call() throws Exception {
1214            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1215            for (HRegion region : regions) {
1216              for (HStore store : region.getStores()) {
1217                store.closeAndArchiveCompactedFiles();
1218              }
1219            }
1220            return fs.listStatus(storePath).length == 1;
1221          }
1222        }, 5000);
1223        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1224      } catch (AssertionError ae) {
1225        // this is expected behavior
1226      }
1227
1228      // a major compaction should work though
1229      admin.majorCompact(TABLE_NAMES[0]);
1230      quickPoll(new Callable<Boolean>() {
1231        @Override
1232        public Boolean call() throws Exception {
1233          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1234          for (HRegion region : regions) {
1235            for (HStore store : region.getStores()) {
1236              store.closeAndArchiveCompactedFiles();
1237            }
1238          }
1239          return fs.listStatus(storePath).length == 1;
1240        }
1241      }, 5000);
1242
1243    } finally {
1244      util.shutdownMiniCluster();
1245    }
1246  }
1247
1248  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1249  @Test
1250  public void testExcludeMinorCompaction() throws Exception {
1251    Configuration conf = util.getConfiguration();
1252    conf.setInt("hbase.hstore.compaction.min", 2);
1253    generateRandomStartKeys(5);
1254
1255    util.startMiniCluster();
1256    try (Connection conn = ConnectionFactory.createConnection(conf);
1257      Admin admin = conn.getAdmin()) {
1258      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1259      final FileSystem fs = util.getDFSCluster().getFileSystem();
1260      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1261      assertEquals("Should start with empty table", 0, util.countRows(table));
1262
1263      // deep inspection: get the StoreFile dir
1264      final Path storePath =
1265        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1266          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1267            Bytes.toString(FAMILIES[0])));
1268      assertEquals(0, fs.listStatus(storePath).length);
1269
1270      // put some data in it and flush to create a storefile
1271      Put p = new Put(Bytes.toBytes("test"));
1272      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1273      table.put(p);
1274      admin.flush(TABLE_NAMES[0]);
1275      assertEquals(1, util.countRows(table));
1276      quickPoll(new Callable<Boolean>() {
1277        @Override
1278        public Boolean call() throws Exception {
1279          return fs.listStatus(storePath).length == 1;
1280        }
1281      }, 5000);
1282
1283      // Generate a bulk load file with more rows
1284      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1285
1286      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1287      runIncrementalPELoad(conf,
1288        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)),
1289        testDir, false);
1290
1291      // Perform the actual load
1292      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1293
1294      // Ensure data shows up
1295      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1296      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1,
1297        util.countRows(table));
1298
1299      // should have a second StoreFile now
1300      assertEquals(2, fs.listStatus(storePath).length);
1301
1302      // minor compactions shouldn't get rid of the file
1303      admin.compact(TABLE_NAMES[0]);
1304      try {
1305        quickPoll(new Callable<Boolean>() {
1306          @Override
1307          public Boolean call() throws Exception {
1308            return fs.listStatus(storePath).length == 1;
1309          }
1310        }, 5000);
1311        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1312      } catch (AssertionError ae) {
1313        // this is expected behavior
1314      }
1315
1316      // a major compaction should work though
1317      admin.majorCompact(TABLE_NAMES[0]);
1318      quickPoll(new Callable<Boolean>() {
1319        @Override
1320        public Boolean call() throws Exception {
1321          return fs.listStatus(storePath).length == 1;
1322        }
1323      }, 5000);
1324
1325    } finally {
1326      util.shutdownMiniCluster();
1327    }
1328  }
1329
1330  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1331    int sleepMs = 10;
1332    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1333    while (retries-- > 0) {
1334      if (c.call().booleanValue()) {
1335        return;
1336      }
1337      Thread.sleep(sleepMs);
1338    }
1339    fail();
1340  }
1341
1342  public static void main(String args[]) throws Exception {
1343    new TestCellBasedHFileOutputFormat2().manualTest(args);
1344  }
1345
1346  public void manualTest(String args[]) throws Exception {
1347    Configuration conf = HBaseConfiguration.create();
1348    util = new HBaseTestingUtility(conf);
1349    if ("newtable".equals(args[0])) {
1350      TableName tname = TableName.valueOf(args[1]);
1351      byte[][] splitKeys = generateRandomSplitKeys(4);
1352      Table table = util.createTable(tname, FAMILIES, splitKeys);
1353    } else if ("incremental".equals(args[0])) {
1354      TableName tname = TableName.valueOf(args[1]);
1355      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1356        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1357        Path outDir = new Path("incremental-out");
1358        runIncrementalPELoad(conf,
1359          Arrays.asList(
1360            new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)),
1361          outDir, false);
1362      }
1363    } else {
1364      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1365    }
1366  }
1367
1368  @Test
1369  public void testBlockStoragePolicy() throws Exception {
1370    util = new HBaseTestingUtility();
1371    Configuration conf = util.getConfiguration();
1372    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1373
1374    conf.set(
1375      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1376        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1377      "ONE_SSD");
1378    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1379    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1380    util.startMiniDFSCluster(3);
1381    FileSystem fs = util.getDFSCluster().getFileSystem();
1382    try {
1383      fs.mkdirs(cf1Dir);
1384      fs.mkdirs(cf2Dir);
1385
1386      // the original block storage policy would be HOT
1387      String spA = getStoragePolicyName(fs, cf1Dir);
1388      String spB = getStoragePolicyName(fs, cf2Dir);
1389      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1390      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1391      assertEquals("HOT", spA);
1392      assertEquals("HOT", spB);
1393
1394      // alter table cf schema to change storage policies
1395      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1396        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1397      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1398        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1399      spA = getStoragePolicyName(fs, cf1Dir);
1400      spB = getStoragePolicyName(fs, cf2Dir);
1401      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1402      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1403      assertNotNull(spA);
1404      assertEquals("ONE_SSD", spA);
1405      assertNotNull(spB);
1406      assertEquals("ALL_SSD", spB);
1407    } finally {
1408      fs.delete(cf1Dir, true);
1409      fs.delete(cf2Dir, true);
1410      util.shutdownMiniDFSCluster();
1411    }
1412  }
1413
1414  private String getStoragePolicyName(FileSystem fs, Path path) {
1415    try {
1416      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1417      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1418    } catch (Exception e) {
1419      // Maybe fail because of using old HDFS version, try the old way
1420      if (LOG.isTraceEnabled()) {
1421        LOG.trace("Failed to get policy directly", e);
1422      }
1423      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1424      return policy == null ? "HOT" : policy;// HOT by default
1425    }
1426  }
1427
1428  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1429    try {
1430      if (fs instanceof DistributedFileSystem) {
1431        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1432        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1433        if (null != status) {
1434          byte storagePolicyId = status.getStoragePolicy();
1435          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1436          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1437            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1438            for (BlockStoragePolicy policy : policies) {
1439              if (policy.getId() == storagePolicyId) {
1440                return policy.getName();
1441              }
1442            }
1443          }
1444        }
1445      }
1446    } catch (Throwable e) {
1447      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1448    }
1449
1450    return null;
1451  }
1452}