001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Random;
038import java.util.Set;
039import java.util.UUID;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.CopyOnWriteArrayList;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.ThreadLocalRandom;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.LocatedFileStatus;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.RemoteIterator;
053import org.apache.hadoop.hbase.ArrayBackedTag;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
057import org.apache.hadoop.hbase.HBaseClassTestRule;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseTestingUtility;
060import org.apache.hadoop.hbase.HColumnDescriptor;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.HDFSBlocksDistribution;
063import org.apache.hadoop.hbase.HTableDescriptor;
064import org.apache.hadoop.hbase.HadoopShims;
065import org.apache.hadoop.hbase.KeyValue;
066import org.apache.hadoop.hbase.PerformanceEvaluation;
067import org.apache.hadoop.hbase.PrivateCellUtil;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.StartMiniClusterOption;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.Tag;
072import org.apache.hadoop.hbase.TagType;
073import org.apache.hadoop.hbase.client.Admin;
074import org.apache.hadoop.hbase.client.BufferedMutator;
075import org.apache.hadoop.hbase.client.BufferedMutatorParams;
076import org.apache.hadoop.hbase.client.ClusterConnection;
077import org.apache.hadoop.hbase.client.Connection;
078import org.apache.hadoop.hbase.client.ConnectionFactory;
079import org.apache.hadoop.hbase.client.Hbck;
080import org.apache.hadoop.hbase.client.Put;
081import org.apache.hadoop.hbase.client.RegionLocator;
082import org.apache.hadoop.hbase.client.Result;
083import org.apache.hadoop.hbase.client.ResultScanner;
084import org.apache.hadoop.hbase.client.Scan;
085import org.apache.hadoop.hbase.client.Table;
086import org.apache.hadoop.hbase.client.TableBuilder;
087import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
088import org.apache.hadoop.hbase.io.compress.Compression;
089import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
090import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
091import org.apache.hadoop.hbase.io.hfile.CacheConfig;
092import org.apache.hadoop.hbase.io.hfile.HFile;
093import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
094import org.apache.hadoop.hbase.io.hfile.HFileScanner;
095import org.apache.hadoop.hbase.regionserver.BloomType;
096import org.apache.hadoop.hbase.regionserver.HRegion;
097import org.apache.hadoop.hbase.regionserver.HStore;
098import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
099import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
100import org.apache.hadoop.hbase.security.User;
101import org.apache.hadoop.hbase.testclassification.LargeTests;
102import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
103import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
104import org.apache.hadoop.hbase.util.Bytes;
105import org.apache.hadoop.hbase.util.CommonFSUtils;
106import org.apache.hadoop.hbase.util.FSUtils;
107import org.apache.hadoop.hbase.util.ReflectionUtils;
108import org.apache.hadoop.hdfs.DistributedFileSystem;
109import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
110import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
111import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
112import org.apache.hadoop.io.NullWritable;
113import org.apache.hadoop.mapreduce.Job;
114import org.apache.hadoop.mapreduce.Mapper;
115import org.apache.hadoop.mapreduce.RecordWriter;
116import org.apache.hadoop.mapreduce.TaskAttemptContext;
117import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
118import org.junit.ClassRule;
119import org.junit.Ignore;
120import org.junit.Test;
121import org.junit.experimental.categories.Category;
122import org.mockito.Mockito;
123import org.slf4j.Logger;
124import org.slf4j.LoggerFactory;
125
126/**
127 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
128 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
129 * values like those of {@link PerformanceEvaluation}.
130 */
131@Category({ VerySlowMapReduceTests.class, LargeTests.class })
132// TODO : Remove this in 3.0
133public class TestHFileOutputFormat2 {
134
135  @ClassRule
136  public static final HBaseClassTestRule CLASS_RULE =
137    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
138
139  private final static int ROWSPERSPLIT = 1024;
140
141  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
142  private static final byte[][] FAMILIES =
143    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
144  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
145    .map(TableName::valueOf).toArray(TableName[]::new);
146
147  private HBaseTestingUtility util = new HBaseTestingUtility();
148
149  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
150
151  /**
152   * Simple mapper that makes KeyValue output.
153   */
154  static class RandomKVGeneratingMapper
155    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
156
157    private int keyLength;
158    private static final int KEYLEN_DEFAULT = 10;
159    private static final String KEYLEN_CONF = "randomkv.key.length";
160
161    private int valLength;
162    private static final int VALLEN_DEFAULT = 10;
163    private static final String VALLEN_CONF = "randomkv.val.length";
164    private static final byte[] QUALIFIER = Bytes.toBytes("data");
165    private boolean multiTableMapper = false;
166    private TableName[] tables = null;
167
168    @Override
169    protected void setup(Context context) throws IOException, InterruptedException {
170      super.setup(context);
171
172      Configuration conf = context.getConfiguration();
173      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
174      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
175      multiTableMapper =
176        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
177      if (multiTableMapper) {
178        tables = TABLE_NAMES;
179      } else {
180        tables = new TableName[] { TABLE_NAMES[0] };
181      }
182    }
183
184    @Override
185    protected void map(NullWritable n1, NullWritable n2,
186      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
187      throws java.io.IOException, InterruptedException {
188
189      byte keyBytes[] = new byte[keyLength];
190      byte valBytes[] = new byte[valLength];
191
192      int taskId = context.getTaskAttemptID().getTaskID().getId();
193      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
194      byte[] key;
195      for (int j = 0; j < tables.length; ++j) {
196        for (int i = 0; i < ROWSPERSPLIT; i++) {
197          Bytes.random(keyBytes);
198          // Ensure that unique tasks generate unique keys
199          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
200          Bytes.random(valBytes);
201          key = keyBytes;
202          if (multiTableMapper) {
203            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
204          }
205
206          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
207            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
208            context.write(new ImmutableBytesWritable(key), kv);
209          }
210        }
211      }
212    }
213  }
214
215  /**
216   * Simple mapper that makes Put output.
217   */
218  static class RandomPutGeneratingMapper
219    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
220
221    private int keyLength;
222    private static final int KEYLEN_DEFAULT = 10;
223    private static final String KEYLEN_CONF = "randomkv.key.length";
224
225    private int valLength;
226    private static final int VALLEN_DEFAULT = 10;
227    private static final String VALLEN_CONF = "randomkv.val.length";
228    private static final byte[] QUALIFIER = Bytes.toBytes("data");
229    private boolean multiTableMapper = false;
230    private TableName[] tables = null;
231
232    @Override
233    protected void setup(Context context) throws IOException, InterruptedException {
234      super.setup(context);
235
236      Configuration conf = context.getConfiguration();
237      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
238      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
239      multiTableMapper =
240        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
241      if (multiTableMapper) {
242        tables = TABLE_NAMES;
243      } else {
244        tables = new TableName[] { TABLE_NAMES[0] };
245      }
246    }
247
248    @Override
249    protected void map(NullWritable n1, NullWritable n2,
250      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
251      throws java.io.IOException, InterruptedException {
252
253      byte keyBytes[] = new byte[keyLength];
254      byte valBytes[] = new byte[valLength];
255
256      int taskId = context.getTaskAttemptID().getTaskID().getId();
257      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
258
259      byte[] key;
260      for (int j = 0; j < tables.length; ++j) {
261        for (int i = 0; i < ROWSPERSPLIT; i++) {
262          Bytes.random(keyBytes);
263          // Ensure that unique tasks generate unique keys
264          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
265          Bytes.random(valBytes);
266          key = keyBytes;
267          if (multiTableMapper) {
268            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
269          }
270
271          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
272            Put p = new Put(keyBytes);
273            p.addColumn(family, QUALIFIER, valBytes);
274            // set TTL to very low so that the scan does not return any value
275            p.setTTL(1l);
276            context.write(new ImmutableBytesWritable(key), p);
277          }
278        }
279      }
280    }
281  }
282
283  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
284    if (putSortReducer) {
285      job.setInputFormatClass(NMapInputFormat.class);
286      job.setMapperClass(RandomPutGeneratingMapper.class);
287      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
288      job.setMapOutputValueClass(Put.class);
289    } else {
290      job.setInputFormatClass(NMapInputFormat.class);
291      job.setMapperClass(RandomKVGeneratingMapper.class);
292      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
293      job.setMapOutputValueClass(KeyValue.class);
294    }
295  }
296
297  /**
298   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
299   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
300   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
301   */
302  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
303  @Test
304  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
305    Configuration conf = new Configuration(this.util.getConfiguration());
306    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
307    TaskAttemptContext context = null;
308    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
309    try {
310      Job job = new Job(conf);
311      FileOutputFormat.setOutputPath(job, dir);
312      context = createTestTaskAttemptContext(job);
313      HFileOutputFormat2 hof = new HFileOutputFormat2();
314      writer = hof.getRecordWriter(context);
315      final byte[] b = Bytes.toBytes("b");
316
317      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
318      // changed by call to write. Check all in kv is same but ts.
319      KeyValue kv = new KeyValue(b, b, b);
320      KeyValue original = kv.clone();
321      writer.write(new ImmutableBytesWritable(), kv);
322      assertFalse(original.equals(kv));
323      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
325      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
326      assertNotSame(original.getTimestamp(), kv.getTimestamp());
327      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
328
329      // Test 2. Now test passing a kv that has explicit ts. It should not be
330      // changed by call to record write.
331      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
332      original = kv.clone();
333      writer.write(new ImmutableBytesWritable(), kv);
334      assertTrue(original.equals(kv));
335    } finally {
336      if (writer != null && context != null) writer.close(context);
337      dir.getFileSystem(conf).delete(dir, true);
338    }
339  }
340
341  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
342    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
343    TaskAttemptContext context =
344      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
345    return context;
346  }
347
348  /*
349   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
350   * time-restricted scans.
351   */
352  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
353  @Test
354  public void test_TIMERANGE() throws Exception {
355    Configuration conf = new Configuration(this.util.getConfiguration());
356    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
357    TaskAttemptContext context = null;
358    Path dir = util.getDataTestDir("test_TIMERANGE_present");
359    LOG.info("Timerange dir writing to dir: " + dir);
360    try {
361      // build a record writer using HFileOutputFormat2
362      Job job = new Job(conf);
363      FileOutputFormat.setOutputPath(job, dir);
364      context = createTestTaskAttemptContext(job);
365      HFileOutputFormat2 hof = new HFileOutputFormat2();
366      writer = hof.getRecordWriter(context);
367
368      // Pass two key values with explicit times stamps
369      final byte[] b = Bytes.toBytes("b");
370
371      // value 1 with timestamp 2000
372      KeyValue kv = new KeyValue(b, b, b, 2000, b);
373      KeyValue original = kv.clone();
374      writer.write(new ImmutableBytesWritable(), kv);
375      assertEquals(original, kv);
376
377      // value 2 with timestamp 1000
378      kv = new KeyValue(b, b, b, 1000, b);
379      original = kv.clone();
380      writer.write(new ImmutableBytesWritable(), kv);
381      assertEquals(original, kv);
382
383      // verify that the file has the proper FileInfo.
384      writer.close(context);
385
386      // the generated file lives 1 directory down from the attempt directory
387      // and is the only file, e.g.
388      // _attempt__0000_r_000000_0/b/1979617994050536795
389      FileSystem fs = FileSystem.get(conf);
390      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
391      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
392      FileStatus[] file = fs.listStatus(sub1[0].getPath());
393
394      // open as HFile Reader and pull out TIMERANGE FileInfo.
395      HFile.Reader rd =
396        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
397      Map<byte[], byte[]> finfo = rd.getHFileInfo();
398      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
399      assertNotNull(range);
400
401      // unmarshall and check values.
402      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
403      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
404      assertEquals(1000, timeRangeTracker.getMin());
405      assertEquals(2000, timeRangeTracker.getMax());
406      rd.close();
407    } finally {
408      if (writer != null && context != null) writer.close(context);
409      dir.getFileSystem(conf).delete(dir, true);
410    }
411  }
412
413  /**
414   * Run small MR job.
415   */
416  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
417  @Test
418  public void testWritingPEData() throws Exception {
419    Configuration conf = util.getConfiguration();
420    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
421    FileSystem fs = testDir.getFileSystem(conf);
422
423    // Set down this value or we OOME in eclipse.
424    conf.setInt("mapreduce.task.io.sort.mb", 20);
425    // Write a few files.
426    long hregionMaxFilesize = 10 * 1024;
427    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
428
429    Job job = new Job(conf, "testWritingPEData");
430    setupRandomGeneratorMapper(job, false);
431    // This partitioner doesn't work well for number keys but using it anyways
432    // just to demonstrate how to configure it.
433    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
435
436    Arrays.fill(startKey, (byte) 0);
437    Arrays.fill(endKey, (byte) 0xff);
438
439    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
440    // Set start and end rows for partitioner.
441    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
442    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
443    job.setReducerClass(KeyValueSortReducer.class);
444    job.setOutputFormatClass(HFileOutputFormat2.class);
445    job.setNumReduceTasks(4);
446    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
447      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
448      KeyValueSerialization.class.getName());
449
450    FileOutputFormat.setOutputPath(job, testDir);
451    assertTrue(job.waitForCompletion(false));
452    FileStatus[] files = fs.listStatus(testDir);
453    assertTrue(files.length > 0);
454
455    // check output file num and size.
456    for (byte[] family : FAMILIES) {
457      long kvCount = 0;
458      RemoteIterator<LocatedFileStatus> iterator =
459        fs.listFiles(testDir.suffix("/" + new String(family)), true);
460      while (iterator.hasNext()) {
461        LocatedFileStatus keyFileStatus = iterator.next();
462        HFile.Reader reader =
463          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
464        HFileScanner scanner = reader.getScanner(conf, false, false, false);
465
466        kvCount += reader.getEntries();
467        scanner.seekTo();
468        long perKVSize = scanner.getCell().getSerializedSize();
469        assertTrue("Data size of each file should not be too large.",
470          perKVSize * reader.getEntries() <= hregionMaxFilesize);
471      }
472      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
473    }
474  }
475
476  /**
477   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
478   */
479  @Test
480  public void test_WritingTagData() throws Exception {
481    Configuration conf = new Configuration(this.util.getConfiguration());
482    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
483    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
484    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
485    TaskAttemptContext context = null;
486    Path dir = util.getDataTestDir("WritingTagData");
487    try {
488      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
489      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
490      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
491      Job job = new Job(conf);
492      FileOutputFormat.setOutputPath(job, dir);
493      context = createTestTaskAttemptContext(job);
494      HFileOutputFormat2 hof = new HFileOutputFormat2();
495      writer = hof.getRecordWriter(context);
496      final byte[] b = Bytes.toBytes("b");
497
498      List<Tag> tags = new ArrayList<>();
499      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
500      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
501      writer.write(new ImmutableBytesWritable(), kv);
502      writer.close(context);
503      writer = null;
504      FileSystem fs = dir.getFileSystem(conf);
505      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
506      while (iterator.hasNext()) {
507        LocatedFileStatus keyFileStatus = iterator.next();
508        HFile.Reader reader =
509          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
510        HFileScanner scanner = reader.getScanner(conf, false, false, false);
511        scanner.seekTo();
512        Cell cell = scanner.getCell();
513        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
514        assertTrue(tagsFromCell.size() > 0);
515        for (Tag tag : tagsFromCell) {
516          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
517        }
518      }
519    } finally {
520      if (writer != null && context != null) writer.close(context);
521      dir.getFileSystem(conf).delete(dir, true);
522    }
523  }
524
525  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
526  @Test
527  public void testJobConfiguration() throws Exception {
528    Configuration conf = new Configuration(this.util.getConfiguration());
529    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
530      util.getDataTestDir("testJobConfiguration").toString());
531    Job job = new Job(conf);
532    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
533    Table table = Mockito.mock(Table.class);
534    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
535    setupMockStartKeys(regionLocator);
536    setupMockTableName(regionLocator);
537    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
538    assertEquals(job.getNumReduceTasks(), 4);
539  }
540
541  private byte[][] generateRandomStartKeys(int numKeys) {
542    Random random = ThreadLocalRandom.current();
543    byte[][] ret = new byte[numKeys][];
544    // first region start key is always empty
545    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
546    for (int i = 1; i < numKeys; i++) {
547      ret[i] =
548        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
549    }
550    return ret;
551  }
552
553  private byte[][] generateRandomSplitKeys(int numKeys) {
554    Random random = ThreadLocalRandom.current();
555    byte[][] ret = new byte[numKeys][];
556    for (int i = 0; i < numKeys; i++) {
557      ret[i] =
558        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
559    }
560    return ret;
561  }
562
563  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
564  @Test
565  public void testMRIncrementalLoad() throws Exception {
566    LOG.info("\nStarting test testMRIncrementalLoad\n");
567    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
568  }
569
570  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
571  @Test
572  public void testMRIncrementalLoadWithSplit() throws Exception {
573    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
574    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
575  }
576
577  /**
578   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
579   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
580   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
581   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
582   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
583   * locality features more easily.
584   */
585  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
586  @Test
587  public void testMRIncrementalLoadWithLocality() throws Exception {
588    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
589    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
590    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
591  }
592
593  // @Ignore("Wahtevs")
594  @Test
595  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
596    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
597    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
598  }
599
600  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
601    boolean putSortReducer, String tableStr) throws Exception {
602    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false,
603      Arrays.asList(tableStr));
604  }
605
606  @Test
607  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
608    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
609    doIncrementalLoadTest(false, false, true, false,
610      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
611  }
612
613  @Test
614  public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception {
615    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n");
616    doIncrementalLoadTest(false, false, true, true,
617      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
618  }
619
620  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
621    boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr)
622    throws Exception {
623    util = new HBaseTestingUtility();
624    Configuration conf = util.getConfiguration();
625    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
626    if (shouldWriteToTableWithNamespace) {
627      conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
628    }
629    int hostCount = 1;
630    int regionNum = 5;
631    if (shouldKeepLocality) {
632      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
633      // explicit hostnames parameter just like MiniDFSCluster does.
634      hostCount = 3;
635      regionNum = 20;
636    }
637
638    String[] hostnames = new String[hostCount];
639    for (int i = 0; i < hostCount; ++i) {
640      hostnames[i] = "datanode_" + i;
641    }
642    StartMiniClusterOption option =
643      StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build();
644    util.startMiniCluster(option);
645
646    Map<String, Table> allTables = new HashMap<>(tableStr.size());
647    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
648    boolean writeMultipleTables = tableStr.size() > 1;
649    for (String tableStrSingle : tableStr) {
650      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
651      TableName tableName = TableName.valueOf(tableStrSingle);
652      Table table = util.createTable(tableName, FAMILIES, splitKeys);
653
654      RegionLocator r = util.getConnection().getRegionLocator(tableName);
655      assertEquals("Should start with empty table", 0, util.countRows(table));
656      int numRegions = r.getStartKeys().length;
657      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
658
659      allTables.put(tableStrSingle, table);
660      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
661    }
662    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
663    // Generate the bulk load files
664    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
665    if (shouldWriteToTableWithNamespace) {
666      testDir = new Path(testDir, "default");
667    }
668
669    for (Table tableSingle : allTables.values()) {
670      // This doesn't write into the table, just makes files
671      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
672    }
673    int numTableDirs = 0;
674    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
675    for (FileStatus tf : fss) {
676      Path tablePath = testDir;
677      if (writeMultipleTables) {
678        if (allTables.containsKey(tf.getPath().getName())) {
679          ++numTableDirs;
680          tablePath = tf.getPath();
681        } else {
682          continue;
683        }
684      }
685
686      // Make sure that a directory was created for every CF
687      int dir = 0;
688      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
689      for (FileStatus f : fss) {
690        for (byte[] family : FAMILIES) {
691          if (Bytes.toString(family).equals(f.getPath().getName())) {
692            ++dir;
693          }
694        }
695      }
696      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
697    }
698    if (writeMultipleTables) {
699      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
700    }
701
702    Admin admin = util.getConnection().getAdmin();
703    try {
704      // handle the split case
705      if (shouldChangeRegions) {
706        Table chosenTable = allTables.values().iterator().next();
707        // Choose a semi-random table if multiple tables are available
708        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
709        admin.disableTable(chosenTable.getName());
710        util.waitUntilNoRegionsInTransition();
711
712        util.deleteTable(chosenTable.getName());
713        byte[][] newSplitKeys = generateRandomSplitKeys(14);
714        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
715
716        while (
717          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
718            .size() != 15 || !admin.isTableAvailable(table.getName())
719        ) {
720          Thread.sleep(200);
721          LOG.info("Waiting for new region assignment to happen");
722        }
723      }
724
725      // Perform the actual load
726      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
727        Path tableDir = testDir;
728        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
729        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
730        if (writeMultipleTables) {
731          tableDir = new Path(testDir, tableNameStr);
732        }
733        Table currentTable = allTables.get(tableNameStr);
734        TableName currentTableName = currentTable.getName();
735        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable,
736          singleTableInfo.getRegionLocator());
737
738        // Ensure data shows up
739        int expectedRows = 0;
740        if (putSortReducer) {
741          // no rows should be extracted
742          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
743            util.countRows(currentTable));
744        } else {
745          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
746          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
747            util.countRows(currentTable));
748          Scan scan = new Scan();
749          ResultScanner results = currentTable.getScanner(scan);
750          for (Result res : results) {
751            assertEquals(FAMILIES.length, res.rawCells().length);
752            Cell first = res.rawCells()[0];
753            for (Cell kv : res.rawCells()) {
754              assertTrue(CellUtil.matchingRows(first, kv));
755              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
756            }
757          }
758          results.close();
759        }
760        String tableDigestBefore = util.checksumRows(currentTable);
761        // Check region locality
762        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
763        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
764          hbd.add(region.getHDFSBlocksDistribution());
765        }
766        for (String hostname : hostnames) {
767          float locality = hbd.getBlockLocalityIndex(hostname);
768          LOG.info("locality of [" + hostname + "]: " + locality);
769          assertEquals(100, (int) (locality * 100));
770        }
771
772        // Cause regions to reopen
773        admin.disableTable(currentTableName);
774        while (!admin.isTableDisabled(currentTableName)) {
775          Thread.sleep(200);
776          LOG.info("Waiting for table to disable");
777        }
778        admin.enableTable(currentTableName);
779        util.waitTableAvailable(currentTableName);
780        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
781          util.checksumRows(currentTable));
782      }
783    } finally {
784      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
785        tableInfoSingle.getRegionLocator().close();
786      }
787      for (Entry<String, Table> singleTable : allTables.entrySet()) {
788        singleTable.getValue().close();
789        util.deleteTable(singleTable.getValue().getName());
790      }
791      testDir.getFileSystem(conf).delete(testDir, true);
792      util.shutdownMiniCluster();
793    }
794  }
795
796  private void runIncrementalPELoad(Configuration conf,
797    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
798    throws IOException, InterruptedException, ClassNotFoundException {
799    Job job = new Job(conf, "testLocalMRIncrementalLoad");
800    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
801    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
802      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
803      KeyValueSerialization.class.getName());
804    setupRandomGeneratorMapper(job, putSortReducer);
805    if (tableInfo.size() > 1) {
806      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
807      int sum = 0;
808      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
809        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
810      }
811      assertEquals(sum, job.getNumReduceTasks());
812    } else {
813      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
814      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
815        regionLocator);
816      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
817    }
818
819    FileOutputFormat.setOutputPath(job, outDir);
820
821    assertFalse(util.getTestFileSystem().exists(outDir));
822
823    assertTrue(job.waitForCompletion(true));
824  }
825
826  /**
827   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
828   * family compression map is correctly serialized into and deserialized from configuration
829   */
830  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
831  @Test
832  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
833    for (int numCfs = 0; numCfs <= 3; numCfs++) {
834      Configuration conf = new Configuration(this.util.getConfiguration());
835      Map<String, Compression.Algorithm> familyToCompression =
836        getMockColumnFamiliesForCompression(numCfs);
837      Table table = Mockito.mock(Table.class);
838      setupMockColumnFamiliesForCompression(table, familyToCompression);
839      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
840        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
841          Arrays.asList(table.getTableDescriptor())));
842
843      // read back family specific compression setting from the configuration
844      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
845        HFileOutputFormat2.createFamilyCompressionMap(conf);
846
847      // test that we have a value for all column families that matches with the
848      // used mock values
849      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
850        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
851          entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
852      }
853    }
854  }
855
856  private void setupMockColumnFamiliesForCompression(Table table,
857    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
858    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
859    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
860      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
861        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
862    }
863    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
864  }
865
866  /**
867   * @return a map from column family names to compression algorithms for testing column family
868   *         compression. Column family names have special characters
869   */
870  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
871    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
872    // use column family names having special characters
873    if (numCfs-- > 0) {
874      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
875    }
876    if (numCfs-- > 0) {
877      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
878    }
879    if (numCfs-- > 0) {
880      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
881    }
882    if (numCfs-- > 0) {
883      familyToCompression.put("Family3", Compression.Algorithm.NONE);
884    }
885    return familyToCompression;
886  }
887
888  /**
889   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
890   * family bloom type map is correctly serialized into and deserialized from configuration
891   */
892  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
893  @Test
894  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
895    for (int numCfs = 0; numCfs <= 2; numCfs++) {
896      Configuration conf = new Configuration(this.util.getConfiguration());
897      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
898      Table table = Mockito.mock(Table.class);
899      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
900      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
901        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
902          Arrays.asList(table.getTableDescriptor())));
903
904      // read back family specific data block encoding settings from the
905      // configuration
906      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
907        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
908
909      // test that we have a value for all column families that matches with the
910      // used mock values
911      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
912        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
913          entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
914      }
915    }
916  }
917
918  private void setupMockColumnFamiliesForBloomType(Table table,
919    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
920    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
921    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
922      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
923        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
924    }
925    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
926  }
927
928  /**
929   * @return a map from column family names to compression algorithms for testing column family
930   *         compression. Column family names have special characters
931   */
932  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
933    Map<String, BloomType> familyToBloomType = new HashMap<>();
934    // use column family names having special characters
935    if (numCfs-- > 0) {
936      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
937    }
938    if (numCfs-- > 0) {
939      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
940    }
941    if (numCfs-- > 0) {
942      familyToBloomType.put("Family3", BloomType.NONE);
943    }
944    return familyToBloomType;
945  }
946
947  /**
948   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
949   * family block size map is correctly serialized into and deserialized from configuration
950   */
951  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
952  @Test
953  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
954    for (int numCfs = 0; numCfs <= 3; numCfs++) {
955      Configuration conf = new Configuration(this.util.getConfiguration());
956      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
957      Table table = Mockito.mock(Table.class);
958      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
959      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
960        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
961          Arrays.asList(table.getTableDescriptor())));
962
963      // read back family specific data block encoding settings from the
964      // configuration
965      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
966        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
967
968      // test that we have a value for all column families that matches with the
969      // used mock values
970      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
971        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
972          entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
973      }
974    }
975  }
976
977  private void setupMockColumnFamiliesForBlockSize(Table table,
978    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
979    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
980    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
981      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
982        .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
983    }
984    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
985  }
986
987  /**
988   * @return a map from column family names to compression algorithms for testing column family
989   *         compression. Column family names have special characters
990   */
991  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
992    Map<String, Integer> familyToBlockSize = new HashMap<>();
993    // use column family names having special characters
994    if (numCfs-- > 0) {
995      familyToBlockSize.put("Family1!@#!@#&", 1234);
996    }
997    if (numCfs-- > 0) {
998      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
999    }
1000    if (numCfs-- > 0) {
1001      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1002    }
1003    if (numCfs-- > 0) {
1004      familyToBlockSize.put("Family3", 0);
1005    }
1006    return familyToBlockSize;
1007  }
1008
1009  /**
1010   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
1011   * the family data block encoding map is correctly serialized into and deserialized from
1012   * configuration
1013   */
1014  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1015  @Test
1016  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1017    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1018      Configuration conf = new Configuration(this.util.getConfiguration());
1019      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1020        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1021      Table table = Mockito.mock(Table.class);
1022      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1023      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1024      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1025        HFileOutputFormat2.serializeColumnFamilyAttribute(
1026          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1027
1028      // read back family specific data block encoding settings from the
1029      // configuration
1030      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1031        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1032
1033      // test that we have a value for all column families that matches with the
1034      // used mock values
1035      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1036        assertEquals(
1037          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1038          entry.getValue(),
1039          retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1040      }
1041    }
1042  }
1043
1044  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1045    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1046    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1047    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1048      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
1049        .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
1050    }
1051    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1052  }
1053
1054  /**
1055   * @return a map from column family names to compression algorithms for testing column family
1056   *         compression. Column family names have special characters
1057   */
1058  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1059    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1060    // use column family names having special characters
1061    if (numCfs-- > 0) {
1062      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1063    }
1064    if (numCfs-- > 0) {
1065      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1066    }
1067    if (numCfs-- > 0) {
1068      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1069    }
1070    if (numCfs-- > 0) {
1071      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1072    }
1073    return familyToDataBlockEncoding;
1074  }
1075
1076  private void setupMockStartKeys(RegionLocator table) throws IOException {
1077    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1078      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1079    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1080  }
1081
1082  private void setupMockTableName(RegionLocator table) throws IOException {
1083    TableName mockTableName = TableName.valueOf("mock_table");
1084    Mockito.doReturn(mockTableName).when(table).getName();
1085  }
1086
1087  /**
1088   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1089   * from the column family descriptor
1090   */
1091  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1092  @Test
1093  public void testColumnFamilySettings() throws Exception {
1094    Configuration conf = new Configuration(this.util.getConfiguration());
1095    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1096    TaskAttemptContext context = null;
1097    Path dir = util.getDataTestDir("testColumnFamilySettings");
1098
1099    // Setup table descriptor
1100    Table table = Mockito.mock(Table.class);
1101    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1102    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1103    Mockito.doReturn(htd).when(table).getTableDescriptor();
1104    for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) {
1105      htd.addFamily(hcd);
1106    }
1107
1108    // set up the table to return some mock keys
1109    setupMockStartKeys(regionLocator);
1110
1111    try {
1112      // partial map red setup to get an operational writer for testing
1113      // We turn off the sequence file compression, because DefaultCodec
1114      // pollutes the GZip codec pool with an incompatible compressor.
1115      conf.set("io.seqfile.compression.type", "NONE");
1116      conf.set("hbase.fs.tmp.dir", dir.toString());
1117      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1118      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1119
1120      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1121      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1122      setupRandomGeneratorMapper(job, false);
1123      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1124      FileOutputFormat.setOutputPath(job, dir);
1125      context = createTestTaskAttemptContext(job);
1126      HFileOutputFormat2 hof = new HFileOutputFormat2();
1127      writer = hof.getRecordWriter(context);
1128
1129      // write out random rows
1130      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1131      writer.close(context);
1132
1133      // Make sure that a directory was created for every CF
1134      FileSystem fs = dir.getFileSystem(conf);
1135
1136      // commit so that the filesystem has one directory per column family
1137      hof.getOutputCommitter(context).commitTask(context);
1138      hof.getOutputCommitter(context).commitJob(context);
1139      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1140      assertEquals(htd.getFamilies().size(), families.length);
1141      for (FileStatus f : families) {
1142        String familyStr = f.getPath().getName();
1143        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1144        // verify that the compression on this file matches the configured
1145        // compression
1146        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1147        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1148        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1149
1150        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1151        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1152        assertEquals(
1153          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1154          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1155        assertEquals(
1156          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1157          hcd.getCompressionType(), reader.getFileContext().getCompression());
1158      }
1159    } finally {
1160      dir.getFileSystem(conf).delete(dir, true);
1161    }
1162  }
1163
1164  /**
1165   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1166   * family descriptors
1167   */
1168  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1169    TaskAttemptContext context, Set<byte[]> families, int numRows)
1170    throws IOException, InterruptedException {
1171    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1172    int valLength = 10;
1173    byte valBytes[] = new byte[valLength];
1174
1175    int taskId = context.getTaskAttemptID().getTaskID().getId();
1176    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1177    final byte[] qualifier = Bytes.toBytes("data");
1178    for (int i = 0; i < numRows; i++) {
1179      Bytes.putInt(keyBytes, 0, i);
1180      Bytes.random(valBytes);
1181      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1182      for (byte[] family : families) {
1183        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1184        writer.write(key, kv);
1185      }
1186    }
1187  }
1188
1189  /**
1190   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1191   * excluded from minor compaction. Without the fix of HBASE-6901, an
1192   * ArrayIndexOutOfBoundsException will be thrown.
1193   */
1194  @Ignore("Flakey: See HBASE-9051")
1195  @Test
1196  public void testExcludeAllFromMinorCompaction() throws Exception {
1197    Configuration conf = util.getConfiguration();
1198    conf.setInt("hbase.hstore.compaction.min", 2);
1199    generateRandomStartKeys(5);
1200
1201    util.startMiniCluster();
1202    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1203      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1204      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1205      final FileSystem fs = util.getDFSCluster().getFileSystem();
1206      assertEquals("Should start with empty table", 0, util.countRows(table));
1207
1208      // deep inspection: get the StoreFile dir
1209      final Path storePath =
1210        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1211          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1212            Bytes.toString(FAMILIES[0])));
1213      assertEquals(0, fs.listStatus(storePath).length);
1214
1215      // Generate two bulk load files
1216      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1217
1218      for (int i = 0; i < 2; i++) {
1219        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1220        runIncrementalPELoad(conf,
1221          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(),
1222            conn.getRegionLocator(TABLE_NAMES[0]))),
1223          testDir, false);
1224        // Perform the actual load
1225        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1226      }
1227
1228      // Ensure data shows up
1229      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1230      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
1231        util.countRows(table));
1232
1233      // should have a second StoreFile now
1234      assertEquals(2, fs.listStatus(storePath).length);
1235
1236      // minor compactions shouldn't get rid of the file
1237      admin.compact(TABLE_NAMES[0]);
1238      try {
1239        quickPoll(new Callable<Boolean>() {
1240          @Override
1241          public Boolean call() throws Exception {
1242            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1243            for (HRegion region : regions) {
1244              for (HStore store : region.getStores()) {
1245                store.closeAndArchiveCompactedFiles();
1246              }
1247            }
1248            return fs.listStatus(storePath).length == 1;
1249          }
1250        }, 5000);
1251        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1252      } catch (AssertionError ae) {
1253        // this is expected behavior
1254      }
1255
1256      // a major compaction should work though
1257      admin.majorCompact(TABLE_NAMES[0]);
1258      quickPoll(new Callable<Boolean>() {
1259        @Override
1260        public Boolean call() throws Exception {
1261          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1262          for (HRegion region : regions) {
1263            for (HStore store : region.getStores()) {
1264              store.closeAndArchiveCompactedFiles();
1265            }
1266          }
1267          return fs.listStatus(storePath).length == 1;
1268        }
1269      }, 5000);
1270
1271    } finally {
1272      util.shutdownMiniCluster();
1273    }
1274  }
1275
1276  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1277  @Test
1278  public void testExcludeMinorCompaction() throws Exception {
1279    Configuration conf = util.getConfiguration();
1280    conf.setInt("hbase.hstore.compaction.min", 2);
1281    generateRandomStartKeys(5);
1282
1283    util.startMiniCluster();
1284    try (Connection conn = ConnectionFactory.createConnection(conf);
1285      Admin admin = conn.getAdmin()) {
1286      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1287      final FileSystem fs = util.getDFSCluster().getFileSystem();
1288      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1289      assertEquals("Should start with empty table", 0, util.countRows(table));
1290
1291      // deep inspection: get the StoreFile dir
1292      final Path storePath =
1293        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1294          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1295            Bytes.toString(FAMILIES[0])));
1296      assertEquals(0, fs.listStatus(storePath).length);
1297
1298      // put some data in it and flush to create a storefile
1299      Put p = new Put(Bytes.toBytes("test"));
1300      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1301      table.put(p);
1302      admin.flush(TABLE_NAMES[0]);
1303      assertEquals(1, util.countRows(table));
1304      quickPoll(new Callable<Boolean>() {
1305        @Override
1306        public Boolean call() throws Exception {
1307          return fs.listStatus(storePath).length == 1;
1308        }
1309      }, 5000);
1310
1311      // Generate a bulk load file with more rows
1312      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1313
1314      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1315      runIncrementalPELoad(conf,
1316        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)),
1317        testDir, false);
1318
1319      // Perform the actual load
1320      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1321
1322      // Ensure data shows up
1323      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1324      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1,
1325        util.countRows(table));
1326
1327      // should have a second StoreFile now
1328      assertEquals(2, fs.listStatus(storePath).length);
1329
1330      // minor compactions shouldn't get rid of the file
1331      admin.compact(TABLE_NAMES[0]);
1332      try {
1333        quickPoll(new Callable<Boolean>() {
1334          @Override
1335          public Boolean call() throws Exception {
1336            return fs.listStatus(storePath).length == 1;
1337          }
1338        }, 5000);
1339        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1340      } catch (AssertionError ae) {
1341        // this is expected behavior
1342      }
1343
1344      // a major compaction should work though
1345      admin.majorCompact(TABLE_NAMES[0]);
1346      quickPoll(new Callable<Boolean>() {
1347        @Override
1348        public Boolean call() throws Exception {
1349          return fs.listStatus(storePath).length == 1;
1350        }
1351      }, 5000);
1352
1353    } finally {
1354      util.shutdownMiniCluster();
1355    }
1356  }
1357
1358  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1359    int sleepMs = 10;
1360    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1361    while (retries-- > 0) {
1362      if (c.call().booleanValue()) {
1363        return;
1364      }
1365      Thread.sleep(sleepMs);
1366    }
1367    fail();
1368  }
1369
1370  public static void main(String args[]) throws Exception {
1371    new TestHFileOutputFormat2().manualTest(args);
1372  }
1373
1374  public void manualTest(String args[]) throws Exception {
1375    Configuration conf = HBaseConfiguration.create();
1376    util = new HBaseTestingUtility(conf);
1377    if ("newtable".equals(args[0])) {
1378      TableName tname = TableName.valueOf(args[1]);
1379      byte[][] splitKeys = generateRandomSplitKeys(4);
1380      Table table = util.createTable(tname, FAMILIES, splitKeys);
1381    } else if ("incremental".equals(args[0])) {
1382      TableName tname = TableName.valueOf(args[1]);
1383      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1384        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1385        Path outDir = new Path("incremental-out");
1386        runIncrementalPELoad(conf,
1387          Arrays.asList(
1388            new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)),
1389          outDir, false);
1390      }
1391    } else {
1392      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1393    }
1394  }
1395
1396  @Test
1397  public void testBlockStoragePolicy() throws Exception {
1398    util = new HBaseTestingUtility();
1399    Configuration conf = util.getConfiguration();
1400    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1401
1402    conf.set(
1403      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1404        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1405      "ONE_SSD");
1406    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1407    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1408    util.startMiniDFSCluster(3);
1409    FileSystem fs = util.getDFSCluster().getFileSystem();
1410    try {
1411      fs.mkdirs(cf1Dir);
1412      fs.mkdirs(cf2Dir);
1413
1414      // the original block storage policy would be HOT
1415      String spA = getStoragePolicyName(fs, cf1Dir);
1416      String spB = getStoragePolicyName(fs, cf2Dir);
1417      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1418      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1419      assertEquals("HOT", spA);
1420      assertEquals("HOT", spB);
1421
1422      // alter table cf schema to change storage policies
1423      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1424        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1425      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1426        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1427      spA = getStoragePolicyName(fs, cf1Dir);
1428      spB = getStoragePolicyName(fs, cf2Dir);
1429      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1430      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1431      assertNotNull(spA);
1432      assertEquals("ONE_SSD", spA);
1433      assertNotNull(spB);
1434      assertEquals("ALL_SSD", spB);
1435    } finally {
1436      fs.delete(cf1Dir, true);
1437      fs.delete(cf2Dir, true);
1438      util.shutdownMiniDFSCluster();
1439    }
1440  }
1441
1442  private String getStoragePolicyName(FileSystem fs, Path path) {
1443    try {
1444      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1445      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1446    } catch (Exception e) {
1447      // Maybe fail because of using old HDFS version, try the old way
1448      if (LOG.isTraceEnabled()) {
1449        LOG.trace("Failed to get policy directly", e);
1450      }
1451      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1452      return policy == null ? "HOT" : policy;// HOT by default
1453    }
1454  }
1455
1456  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1457    try {
1458      if (fs instanceof DistributedFileSystem) {
1459        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1460        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1461        if (null != status) {
1462          byte storagePolicyId = status.getStoragePolicy();
1463          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1464          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1465            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1466            for (BlockStoragePolicy policy : policies) {
1467              if (policy.getId() == storagePolicyId) {
1468                return policy.getName();
1469              }
1470            }
1471          }
1472        }
1473      }
1474    } catch (Throwable e) {
1475      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1476    }
1477
1478    return null;
1479  }
1480
1481  @Test
1482  public void TestConfigureCompression() throws Exception {
1483    Configuration conf = new Configuration(this.util.getConfiguration());
1484    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1485    TaskAttemptContext context = null;
1486    Path dir = util.getDataTestDir("TestConfigureCompression");
1487    String hfileoutputformatCompression = "gz";
1488
1489    try {
1490      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1491      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1492
1493      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1494
1495      Job job = Job.getInstance(conf);
1496      FileOutputFormat.setOutputPath(job, dir);
1497      context = createTestTaskAttemptContext(job);
1498      HFileOutputFormat2 hof = new HFileOutputFormat2();
1499      writer = hof.getRecordWriter(context);
1500      final byte[] b = Bytes.toBytes("b");
1501
1502      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1503      writer.write(new ImmutableBytesWritable(), kv);
1504      writer.close(context);
1505      writer = null;
1506      FileSystem fs = dir.getFileSystem(conf);
1507      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1508      while (iterator.hasNext()) {
1509        LocatedFileStatus keyFileStatus = iterator.next();
1510        HFile.Reader reader =
1511          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1512        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1513          hfileoutputformatCompression);
1514      }
1515    } finally {
1516      if (writer != null && context != null) {
1517        writer.close(context);
1518      }
1519      dir.getFileSystem(conf).delete(dir, true);
1520    }
1521
1522  }
1523
1524  @Test
1525  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1526    // Start cluster A
1527    util = new HBaseTestingUtility();
1528    Configuration confA = util.getConfiguration();
1529    int hostCount = 3;
1530    int regionNum = 20;
1531    String[] hostnames = new String[hostCount];
1532    for (int i = 0; i < hostCount; ++i) {
1533      hostnames[i] = "datanode_" + i;
1534    }
1535    StartMiniClusterOption option =
1536      StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1537    util.startMiniCluster(option);
1538
1539    // Start cluster B
1540    HBaseTestingUtility utilB = new HBaseTestingUtility();
1541    Configuration confB = utilB.getConfiguration();
1542    utilB.startMiniCluster(option);
1543
1544    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1545
1546    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1547    TableName tableName = TableName.valueOf("table");
1548    // Create table in cluster B
1549    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1550      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1551      // Generate the bulk load files
1552      // Job has zookeeper configuration for cluster A
1553      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1554      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1555      Configuration jobConf = job.getConfiguration();
1556      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1557      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1558      setupRandomGeneratorMapper(job, false);
1559      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1560
1561      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1562        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1563      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1564        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1565      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1566        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1567
1568      String bSpecificConfigKey = "my.override.config.for.b";
1569      String bSpecificConfigValue = "b-specific-value";
1570      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1571        bSpecificConfigValue);
1572
1573      FileOutputFormat.setOutputPath(job, testDir);
1574
1575      assertFalse(util.getTestFileSystem().exists(testDir));
1576
1577      assertTrue(job.waitForCompletion(true));
1578
1579      final List<Configuration> configs =
1580        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1581
1582      assertFalse(configs.isEmpty());
1583      for (Configuration config : configs) {
1584        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1585          config.get(HConstants.ZOOKEEPER_QUORUM));
1586        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1587          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1588        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1589          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1590
1591        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1592      }
1593    } finally {
1594      utilB.deleteTable(tableName);
1595      testDir.getFileSystem(confA).delete(testDir, true);
1596      util.shutdownMiniCluster();
1597      utilB.shutdownMiniCluster();
1598    }
1599  }
1600
1601  private static class ConfigurationCaptorConnection implements Connection {
1602    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1603
1604    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1605
1606    private final Connection delegate;
1607
1608    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1609      Map<String, byte[]> connectionAttributes) throws IOException {
1610      Configuration confForDelegate = new Configuration(conf);
1611      confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL);
1612      delegate = createConnection(confForDelegate, es, user, connectionAttributes);
1613      final String uuid = conf.get(UUID_KEY);
1614      if (uuid != null) {
1615        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1616      }
1617    }
1618
1619    static UUID configureConnectionImpl(Configuration conf) {
1620      conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
1621        ConfigurationCaptorConnection.class, Connection.class);
1622
1623      final UUID uuid = UUID.randomUUID();
1624      conf.set(UUID_KEY, uuid.toString());
1625      return uuid;
1626    }
1627
1628    static List<Configuration> getCapturedConfigarutions(UUID key) {
1629      return confs.get(key);
1630    }
1631
1632    @Override
1633    public Configuration getConfiguration() {
1634      return delegate.getConfiguration();
1635    }
1636
1637    @Override
1638    public Table getTable(TableName tableName) throws IOException {
1639      return delegate.getTable(tableName);
1640    }
1641
1642    @Override
1643    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1644      return delegate.getTable(tableName, pool);
1645    }
1646
1647    @Override
1648    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1649      return delegate.getBufferedMutator(tableName);
1650    }
1651
1652    @Override
1653    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1654      return delegate.getBufferedMutator(params);
1655    }
1656
1657    @Override
1658    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1659      return delegate.getRegionLocator(tableName);
1660    }
1661
1662    @Override
1663    public void clearRegionLocationCache() {
1664      delegate.clearRegionLocationCache();
1665    }
1666
1667    @Override
1668    public Admin getAdmin() throws IOException {
1669      return delegate.getAdmin();
1670    }
1671
1672    @Override
1673    public void close() throws IOException {
1674      delegate.close();
1675    }
1676
1677    @Override
1678    public boolean isClosed() {
1679      return delegate.isClosed();
1680    }
1681
1682    @Override
1683    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1684      return delegate.getTableBuilder(tableName, pool);
1685    }
1686
1687    @Override
1688    public Hbck getHbck() throws IOException {
1689      return delegate.getHbck();
1690    }
1691
1692    @Override
1693    public Hbck getHbck(ServerName masterServer) throws IOException {
1694      return delegate.getHbck(masterServer);
1695    }
1696
1697    @Override
1698    public void abort(String why, Throwable e) {
1699      delegate.abort(why, e);
1700    }
1701
1702    @Override
1703    public boolean isAborted() {
1704      return delegate.isAborted();
1705    }
1706
1707    @Override
1708    public String getClusterId() {
1709      return delegate.getClusterId();
1710    }
1711  }
1712}