001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Mockito.verify;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.security.PrivilegedAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Random;
040import java.util.Set;
041import java.util.UUID;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.CopyOnWriteArrayList;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.LocatedFileStatus;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.RemoteIterator;
055import org.apache.hadoop.hbase.ArrayBackedTag;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellUtil;
058import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
059import org.apache.hadoop.hbase.ExtendedCell;
060import org.apache.hadoop.hbase.HBaseClassTestRule;
061import org.apache.hadoop.hbase.HBaseConfiguration;
062import org.apache.hadoop.hbase.HBaseTestingUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.HDFSBlocksDistribution;
065import org.apache.hadoop.hbase.HadoopShims;
066import org.apache.hadoop.hbase.KeyValue;
067import org.apache.hadoop.hbase.PrivateCellUtil;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.StartTestingClusterOption;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.Tag;
072import org.apache.hadoop.hbase.TagType;
073import org.apache.hadoop.hbase.client.Admin;
074import org.apache.hadoop.hbase.client.AsyncConnection;
075import org.apache.hadoop.hbase.client.BufferedMutator;
076import org.apache.hadoop.hbase.client.BufferedMutatorParams;
077import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
079import org.apache.hadoop.hbase.client.Connection;
080import org.apache.hadoop.hbase.client.ConnectionFactory;
081import org.apache.hadoop.hbase.client.ConnectionRegistry;
082import org.apache.hadoop.hbase.client.ConnectionUtils;
083import org.apache.hadoop.hbase.client.Hbck;
084import org.apache.hadoop.hbase.client.Put;
085import org.apache.hadoop.hbase.client.RegionLocator;
086import org.apache.hadoop.hbase.client.Result;
087import org.apache.hadoop.hbase.client.ResultScanner;
088import org.apache.hadoop.hbase.client.Scan;
089import org.apache.hadoop.hbase.client.Table;
090import org.apache.hadoop.hbase.client.TableBuilder;
091import org.apache.hadoop.hbase.client.TableDescriptor;
092import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
093import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
094import org.apache.hadoop.hbase.io.compress.Compression;
095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
097import org.apache.hadoop.hbase.io.hfile.CacheConfig;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
100import org.apache.hadoop.hbase.io.hfile.HFileScanner;
101import org.apache.hadoop.hbase.regionserver.BloomType;
102import org.apache.hadoop.hbase.regionserver.HRegion;
103import org.apache.hadoop.hbase.regionserver.HStore;
104import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
105import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
106import org.apache.hadoop.hbase.security.User;
107import org.apache.hadoop.hbase.testclassification.LargeTests;
108import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
109import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
110import org.apache.hadoop.hbase.util.Bytes;
111import org.apache.hadoop.hbase.util.CommonFSUtils;
112import org.apache.hadoop.hbase.util.FSUtils;
113import org.apache.hadoop.hbase.util.FutureUtils;
114import org.apache.hadoop.hbase.util.ReflectionUtils;
115import org.apache.hadoop.hdfs.DistributedFileSystem;
116import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
117import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
118import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
119import org.apache.hadoop.io.NullWritable;
120import org.apache.hadoop.mapreduce.Job;
121import org.apache.hadoop.mapreduce.Mapper;
122import org.apache.hadoop.mapreduce.RecordWriter;
123import org.apache.hadoop.mapreduce.TaskAttemptContext;
124import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
125import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
126import org.apache.hadoop.security.UserGroupInformation;
127import org.junit.Assert;
128import org.junit.ClassRule;
129import org.junit.Ignore;
130import org.junit.Test;
131import org.junit.experimental.categories.Category;
132import org.mockito.Mockito;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136/**
137 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
138 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
139 * values.
140 */
141@Category({ VerySlowMapReduceTests.class, LargeTests.class })
142public class TestHFileOutputFormat2 {
143
144  @ClassRule
145  public static final HBaseClassTestRule CLASS_RULE =
146    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
147
148  private final static int ROWSPERSPLIT = 1024;
149  private static final int DEFAULT_VALUE_LENGTH = 1000;
150
151  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
152  private static final byte[][] FAMILIES =
153    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
154  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
155    .map(TableName::valueOf).toArray(TableName[]::new);
156
157  private HBaseTestingUtil util = new HBaseTestingUtil();
158
159  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
160
161  /**
162   * Simple mapper that makes KeyValue output.
163   */
164  static class RandomKVGeneratingMapper
165    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
166
167    private int keyLength;
168    private static final int KEYLEN_DEFAULT = 10;
169    private static final String KEYLEN_CONF = "randomkv.key.length";
170
171    private int valLength;
172    private static final int VALLEN_DEFAULT = 10;
173    private static final String VALLEN_CONF = "randomkv.val.length";
174    private static final byte[] QUALIFIER = Bytes.toBytes("data");
175    private boolean multiTableMapper = false;
176    private TableName[] tables = null;
177
178    @Override
179    protected void setup(Context context) throws IOException, InterruptedException {
180      super.setup(context);
181
182      Configuration conf = context.getConfiguration();
183      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
184      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
185      multiTableMapper =
186        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
187      if (multiTableMapper) {
188        tables = TABLE_NAMES;
189      } else {
190        tables = new TableName[] { TABLE_NAMES[0] };
191      }
192    }
193
194    @Override
195    protected void map(NullWritable n1, NullWritable n2,
196      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
197      throws java.io.IOException, InterruptedException {
198
199      byte keyBytes[] = new byte[keyLength];
200      byte valBytes[] = new byte[valLength];
201
202      int taskId = context.getTaskAttemptID().getTaskID().getId();
203      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
204      byte[] key;
205      for (int j = 0; j < tables.length; ++j) {
206        for (int i = 0; i < ROWSPERSPLIT; i++) {
207          Bytes.random(keyBytes);
208          // Ensure that unique tasks generate unique keys
209          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
210          Bytes.random(valBytes);
211          key = keyBytes;
212          if (multiTableMapper) {
213            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
214          }
215
216          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
217            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
218            context.write(new ImmutableBytesWritable(key), kv);
219          }
220        }
221      }
222    }
223  }
224
225  /**
226   * Simple mapper that makes Put output.
227   */
228  static class RandomPutGeneratingMapper
229    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
230
231    private int keyLength;
232    private static final int KEYLEN_DEFAULT = 10;
233    private static final String KEYLEN_CONF = "randomkv.key.length";
234
235    private int valLength;
236    private static final int VALLEN_DEFAULT = 10;
237    private static final String VALLEN_CONF = "randomkv.val.length";
238    private static final byte[] QUALIFIER = Bytes.toBytes("data");
239    private boolean multiTableMapper = false;
240    private TableName[] tables = null;
241
242    @Override
243    protected void setup(Context context) throws IOException, InterruptedException {
244      super.setup(context);
245
246      Configuration conf = context.getConfiguration();
247      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
248      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
249      multiTableMapper =
250        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
251      if (multiTableMapper) {
252        tables = TABLE_NAMES;
253      } else {
254        tables = new TableName[] { TABLE_NAMES[0] };
255      }
256    }
257
258    @Override
259    protected void map(NullWritable n1, NullWritable n2,
260      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
261      throws java.io.IOException, InterruptedException {
262
263      byte keyBytes[] = new byte[keyLength];
264      byte valBytes[] = new byte[valLength];
265
266      int taskId = context.getTaskAttemptID().getTaskID().getId();
267      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
268
269      byte[] key;
270      for (int j = 0; j < tables.length; ++j) {
271        for (int i = 0; i < ROWSPERSPLIT; i++) {
272          Bytes.random(keyBytes);
273          // Ensure that unique tasks generate unique keys
274          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
275          Bytes.random(valBytes);
276          key = keyBytes;
277          if (multiTableMapper) {
278            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
279          }
280
281          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
282            Put p = new Put(keyBytes);
283            p.addColumn(family, QUALIFIER, valBytes);
284            // set TTL to very low so that the scan does not return any value
285            p.setTTL(1l);
286            context.write(new ImmutableBytesWritable(key), p);
287          }
288        }
289      }
290    }
291  }
292
293  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
294    if (putSortReducer) {
295      job.setInputFormatClass(NMapInputFormat.class);
296      job.setMapperClass(RandomPutGeneratingMapper.class);
297      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
298      job.setMapOutputValueClass(Put.class);
299    } else {
300      job.setInputFormatClass(NMapInputFormat.class);
301      job.setMapperClass(RandomKVGeneratingMapper.class);
302      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
303      job.setMapOutputValueClass(KeyValue.class);
304    }
305  }
306
307  /**
308   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
309   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
310   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
311   */
312  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
313  @Test
314  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
315    Configuration conf = new Configuration(this.util.getConfiguration());
316    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
317    TaskAttemptContext context = null;
318    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
319    try {
320      Job job = new Job(conf);
321      FileOutputFormat.setOutputPath(job, dir);
322      context = createTestTaskAttemptContext(job);
323      HFileOutputFormat2 hof = new HFileOutputFormat2();
324      writer = hof.getRecordWriter(context);
325      final byte[] b = Bytes.toBytes("b");
326
327      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
328      // changed by call to write. Check all in kv is same but ts.
329      KeyValue kv = new KeyValue(b, b, b);
330      KeyValue original = kv.clone();
331      writer.write(new ImmutableBytesWritable(), kv);
332      assertFalse(original.equals(kv));
333      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
334      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
335      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
336      assertNotSame(original.getTimestamp(), kv.getTimestamp());
337      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
338
339      // Test 2. Now test passing a kv that has explicit ts. It should not be
340      // changed by call to record write.
341      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
342      original = kv.clone();
343      writer.write(new ImmutableBytesWritable(), kv);
344      assertTrue(original.equals(kv));
345    } finally {
346      if (writer != null && context != null) writer.close(context);
347      dir.getFileSystem(conf).delete(dir, true);
348    }
349  }
350
351  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
352    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
353    TaskAttemptContext context =
354      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
355    return context;
356  }
357
358  /*
359   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
360   * time-restricted scans.
361   */
362  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
363  @Test
364  public void test_TIMERANGE() throws Exception {
365    Configuration conf = new Configuration(this.util.getConfiguration());
366    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
367    TaskAttemptContext context = null;
368    Path dir = util.getDataTestDir("test_TIMERANGE_present");
369    LOG.info("Timerange dir writing to dir: " + dir);
370    try {
371      // build a record writer using HFileOutputFormat2
372      Job job = new Job(conf);
373      FileOutputFormat.setOutputPath(job, dir);
374      context = createTestTaskAttemptContext(job);
375      HFileOutputFormat2 hof = new HFileOutputFormat2();
376      writer = hof.getRecordWriter(context);
377
378      // Pass two key values with explicit times stamps
379      final byte[] b = Bytes.toBytes("b");
380
381      // value 1 with timestamp 2000
382      KeyValue kv = new KeyValue(b, b, b, 2000, b);
383      KeyValue original = kv.clone();
384      writer.write(new ImmutableBytesWritable(), kv);
385      assertEquals(original, kv);
386
387      // value 2 with timestamp 1000
388      kv = new KeyValue(b, b, b, 1000, b);
389      original = kv.clone();
390      writer.write(new ImmutableBytesWritable(), kv);
391      assertEquals(original, kv);
392
393      // verify that the file has the proper FileInfo.
394      writer.close(context);
395
396      // the generated file lives 1 directory down from the attempt directory
397      // and is the only file, e.g.
398      // _attempt__0000_r_000000_0/b/1979617994050536795
399      FileSystem fs = FileSystem.get(conf);
400      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
401      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
402      FileStatus[] file = fs.listStatus(sub1[0].getPath());
403
404      // open as HFile Reader and pull out TIMERANGE FileInfo.
405      HFile.Reader rd =
406        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
407      Map<byte[], byte[]> finfo = rd.getHFileInfo();
408      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
409      assertNotNull(range);
410
411      // unmarshall and check values.
412      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
413      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
414      assertEquals(1000, timeRangeTracker.getMin());
415      assertEquals(2000, timeRangeTracker.getMax());
416      rd.close();
417    } finally {
418      if (writer != null && context != null) writer.close(context);
419      dir.getFileSystem(conf).delete(dir, true);
420    }
421  }
422
423  /**
424   * Run small MR job.
425   */
426  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
427  @Test
428  public void testWritingPEData() throws Exception {
429    Configuration conf = util.getConfiguration();
430    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
431    FileSystem fs = testDir.getFileSystem(conf);
432
433    // Set down this value or we OOME in eclipse.
434    conf.setInt("mapreduce.task.io.sort.mb", 20);
435    // Write a few files.
436    long hregionMaxFilesize = 10 * 1024;
437    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
438
439    Job job = new Job(conf, "testWritingPEData");
440    setupRandomGeneratorMapper(job, false);
441    // This partitioner doesn't work well for number keys but using it anyways
442    // just to demonstrate how to configure it.
443    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
444    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
445
446    Arrays.fill(startKey, (byte) 0);
447    Arrays.fill(endKey, (byte) 0xff);
448
449    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
450    // Set start and end rows for partitioner.
451    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
452    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
453    job.setReducerClass(CellSortReducer.class);
454    job.setOutputFormatClass(HFileOutputFormat2.class);
455    job.setNumReduceTasks(4);
456    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
457      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
458      CellSerialization.class.getName());
459
460    FileOutputFormat.setOutputPath(job, testDir);
461    assertTrue(job.waitForCompletion(false));
462    FileStatus[] files = fs.listStatus(testDir);
463    assertTrue(files.length > 0);
464
465    // check output file num and size.
466    for (byte[] family : FAMILIES) {
467      long kvCount = 0;
468      RemoteIterator<LocatedFileStatus> iterator =
469        fs.listFiles(testDir.suffix("/" + new String(family)), true);
470      while (iterator.hasNext()) {
471        LocatedFileStatus keyFileStatus = iterator.next();
472        HFile.Reader reader =
473          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
474        HFileScanner scanner = reader.getScanner(conf, false, false, false);
475
476        kvCount += reader.getEntries();
477        scanner.seekTo();
478        long perKVSize = scanner.getCell().getSerializedSize();
479        assertTrue("Data size of each file should not be too large.",
480          perKVSize * reader.getEntries() <= hregionMaxFilesize);
481      }
482      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
483    }
484  }
485
486  /**
487   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
488   */
489  @Test
490  public void test_WritingTagData() throws Exception {
491    Configuration conf = new Configuration(this.util.getConfiguration());
492    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
493    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
494    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
495    TaskAttemptContext context = null;
496    Path dir = util.getDataTestDir("WritingTagData");
497    try {
498      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
499      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
500      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
501      Job job = new Job(conf);
502      FileOutputFormat.setOutputPath(job, dir);
503      context = createTestTaskAttemptContext(job);
504      HFileOutputFormat2 hof = new HFileOutputFormat2();
505      writer = hof.getRecordWriter(context);
506      final byte[] b = Bytes.toBytes("b");
507
508      List<Tag> tags = new ArrayList<>();
509      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
510      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
511      writer.write(new ImmutableBytesWritable(), kv);
512      writer.close(context);
513      writer = null;
514      FileSystem fs = dir.getFileSystem(conf);
515      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
516      while (iterator.hasNext()) {
517        LocatedFileStatus keyFileStatus = iterator.next();
518        HFile.Reader reader =
519          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
520        HFileScanner scanner = reader.getScanner(conf, false, false, false);
521        scanner.seekTo();
522        ExtendedCell cell = scanner.getCell();
523        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
524        assertTrue(tagsFromCell.size() > 0);
525        for (Tag tag : tagsFromCell) {
526          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
527        }
528      }
529    } finally {
530      if (writer != null && context != null) writer.close(context);
531      dir.getFileSystem(conf).delete(dir, true);
532    }
533  }
534
535  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
536  @Test
537  public void testJobConfiguration() throws Exception {
538    Configuration conf = new Configuration(this.util.getConfiguration());
539    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
540      util.getDataTestDir("testJobConfiguration").toString());
541    Job job = new Job(conf);
542    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
543    Table table = Mockito.mock(Table.class);
544    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
545    setupMockStartKeys(regionLocator);
546    setupMockTableName(regionLocator);
547    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
548    assertEquals(job.getNumReduceTasks(), 4);
549  }
550
551  private byte[][] generateRandomStartKeys(int numKeys) {
552    Random random = ThreadLocalRandom.current();
553    byte[][] ret = new byte[numKeys][];
554    // first region start key is always empty
555    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
556    for (int i = 1; i < numKeys; i++) {
557      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
558    }
559    return ret;
560  }
561
562  /*
563   * This method takes some time and is done inline uploading data. For example, doing the mapfile
564   * test, generation of the key and value consumes about 30% of CPU time.
565   * @return Generated random value to insert into a table cell.
566   */
567  public static byte[] generateData(final Random r, int length) {
568    byte[] b = new byte[length];
569    int i;
570
571    for (i = 0; i < (length - 8); i += 8) {
572      b[i] = (byte) (65 + r.nextInt(26));
573      b[i + 1] = b[i];
574      b[i + 2] = b[i];
575      b[i + 3] = b[i];
576      b[i + 4] = b[i];
577      b[i + 5] = b[i];
578      b[i + 6] = b[i];
579      b[i + 7] = b[i];
580    }
581
582    byte a = (byte) (65 + r.nextInt(26));
583    for (; i < length; i++) {
584      b[i] = a;
585    }
586    return b;
587  }
588
589  private byte[][] generateRandomSplitKeys(int numKeys) {
590    Random random = ThreadLocalRandom.current();
591    byte[][] ret = new byte[numKeys][];
592    for (int i = 0; i < numKeys; i++) {
593      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
594    }
595    return ret;
596  }
597
598  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
599  @Test
600  public void testMRIncrementalLoad() throws Exception {
601    LOG.info("\nStarting test testMRIncrementalLoad\n");
602    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
603  }
604
605  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
606  @Test
607  public void testMRIncrementalLoadWithSplit() throws Exception {
608    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
609    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
610  }
611
612  /**
613   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
614   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
615   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
616   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
617   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
618   * locality features more easily.
619   */
620  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
621  @Test
622  public void testMRIncrementalLoadWithLocality() throws Exception {
623    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
624    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
625    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
626  }
627
628  // @Ignore("Wahtevs")
629  @Test
630  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
631    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
632    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
633  }
634
635  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
636    boolean putSortReducer, String tableStr) throws Exception {
637    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
638      Arrays.asList(tableStr));
639  }
640
641  @Test
642  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
643    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
644    doIncrementalLoadTest(false, false, true,
645      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
646  }
647
648  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
649    boolean putSortReducer, List<String> tableStr) throws Exception {
650    util = new HBaseTestingUtil();
651    Configuration conf = util.getConfiguration();
652    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
653    int hostCount = 1;
654    int regionNum = 5;
655    if (shouldKeepLocality) {
656      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
657      // explicit hostnames parameter just like MiniDFSCluster does.
658      hostCount = 3;
659      regionNum = 20;
660    }
661
662    String[] hostnames = new String[hostCount];
663    for (int i = 0; i < hostCount; ++i) {
664      hostnames[i] = "datanode_" + i;
665    }
666    StartTestingClusterOption option = StartTestingClusterOption.builder()
667      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
668    util.startMiniCluster(option);
669
670    Map<String, Table> allTables = new HashMap<>(tableStr.size());
671    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
672    boolean writeMultipleTables = tableStr.size() > 1;
673    for (String tableStrSingle : tableStr) {
674      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
675      TableName tableName = TableName.valueOf(tableStrSingle);
676      Table table = util.createTable(tableName, FAMILIES, splitKeys);
677
678      RegionLocator r = util.getConnection().getRegionLocator(tableName);
679      assertEquals("Should start with empty table", 0, util.countRows(table));
680      int numRegions = r.getStartKeys().length;
681      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
682
683      allTables.put(tableStrSingle, table);
684      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
685    }
686    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
687    // Generate the bulk load files
688    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
689    if (writeMultipleTables) {
690      testDir = new Path(testDir, "default");
691    }
692
693    for (Table tableSingle : allTables.values()) {
694      // This doesn't write into the table, just makes files
695      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
696    }
697    int numTableDirs = 0;
698    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
699    for (FileStatus tf : fss) {
700      Path tablePath = testDir;
701      if (writeMultipleTables) {
702        if (allTables.containsKey(tf.getPath().getName())) {
703          ++numTableDirs;
704          tablePath = tf.getPath();
705        } else {
706          continue;
707        }
708      }
709
710      // Make sure that a directory was created for every CF
711      int dir = 0;
712      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
713      for (FileStatus f : fss) {
714        for (byte[] family : FAMILIES) {
715          if (Bytes.toString(family).equals(f.getPath().getName())) {
716            ++dir;
717          }
718        }
719      }
720      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
721    }
722    if (writeMultipleTables) {
723      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
724    }
725
726    Admin admin = util.getConnection().getAdmin();
727    try {
728      // handle the split case
729      if (shouldChangeRegions) {
730        Table chosenTable = allTables.values().iterator().next();
731        // Choose a semi-random table if multiple tables are available
732        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
733        admin.disableTable(chosenTable.getName());
734        util.waitUntilNoRegionsInTransition();
735
736        util.deleteTable(chosenTable.getName());
737        byte[][] newSplitKeys = generateRandomSplitKeys(14);
738        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
739
740        while (
741          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
742            .size() != 15 || !admin.isTableAvailable(table.getName())
743        ) {
744          Thread.sleep(200);
745          LOG.info("Waiting for new region assignment to happen");
746        }
747      }
748
749      // Perform the actual load
750      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
751        Path tableDir = testDir;
752        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
753        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
754        if (writeMultipleTables) {
755          tableDir = new Path(testDir, tableNameStr);
756        }
757        Table currentTable = allTables.get(tableNameStr);
758        TableName currentTableName = currentTable.getName();
759        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
760
761        // Ensure data shows up
762        int expectedRows = 0;
763        if (putSortReducer) {
764          // no rows should be extracted
765          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
766            util.countRows(currentTable));
767        } else {
768          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
769          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
770            util.countRows(currentTable));
771          Scan scan = new Scan();
772          ResultScanner results = currentTable.getScanner(scan);
773          for (Result res : results) {
774            assertEquals(FAMILIES.length, res.rawCells().length);
775            Cell first = res.rawCells()[0];
776            for (Cell kv : res.rawCells()) {
777              assertTrue(CellUtil.matchingRows(first, kv));
778              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
779            }
780          }
781          results.close();
782        }
783        String tableDigestBefore = util.checksumRows(currentTable);
784        // Check region locality
785        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
786        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
787          hbd.add(region.getHDFSBlocksDistribution());
788        }
789        for (String hostname : hostnames) {
790          float locality = hbd.getBlockLocalityIndex(hostname);
791          LOG.info("locality of [" + hostname + "]: " + locality);
792          assertEquals(100, (int) (locality * 100));
793        }
794
795        // Cause regions to reopen
796        admin.disableTable(currentTableName);
797        while (!admin.isTableDisabled(currentTableName)) {
798          Thread.sleep(200);
799          LOG.info("Waiting for table to disable");
800        }
801        admin.enableTable(currentTableName);
802        util.waitTableAvailable(currentTableName);
803        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
804          util.checksumRows(currentTable));
805      }
806    } finally {
807      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
808        tableInfoSingle.getRegionLocator().close();
809      }
810      for (Entry<String, Table> singleTable : allTables.entrySet()) {
811        singleTable.getValue().close();
812        util.deleteTable(singleTable.getValue().getName());
813      }
814      testDir.getFileSystem(conf).delete(testDir, true);
815      util.shutdownMiniCluster();
816    }
817  }
818
819  private void runIncrementalPELoad(Configuration conf,
820    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
821    throws IOException, InterruptedException, ClassNotFoundException {
822    Job job = new Job(conf, "testLocalMRIncrementalLoad");
823    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
824    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
825      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
826      CellSerialization.class.getName());
827    setupRandomGeneratorMapper(job, putSortReducer);
828    if (tableInfo.size() > 1) {
829      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
830      int sum = 0;
831      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
832        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
833      }
834      assertEquals(sum, job.getNumReduceTasks());
835    } else {
836      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
837      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
838        regionLocator);
839      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
840    }
841
842    FileOutputFormat.setOutputPath(job, outDir);
843
844    assertFalse(util.getTestFileSystem().exists(outDir));
845
846    assertTrue(job.waitForCompletion(true));
847  }
848
849  /**
850   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
851   * family compression map is correctly serialized into and deserialized from configuration
852   */
853  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
854  @Test
855  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
856    for (int numCfs = 0; numCfs <= 3; numCfs++) {
857      Configuration conf = new Configuration(this.util.getConfiguration());
858      Map<String, Compression.Algorithm> familyToCompression =
859        getMockColumnFamiliesForCompression(numCfs);
860      Table table = Mockito.mock(Table.class);
861      setupMockColumnFamiliesForCompression(table, familyToCompression);
862      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
863        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
864          Arrays.asList(table.getDescriptor())));
865
866      // read back family specific compression setting from the configuration
867      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
868        HFileOutputFormat2.createFamilyCompressionMap(conf);
869
870      // test that we have a value for all column families that matches with the
871      // used mock values
872      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
873        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
874          entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
875      }
876    }
877  }
878
879  private void setupMockColumnFamiliesForCompression(Table table,
880    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
881
882    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
883    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
884      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
885        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
886        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
887
888      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
889    }
890    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
891  }
892
893  /**
894   * @return a map from column family names to compression algorithms for testing column family
895   *         compression. Column family names have special characters
896   */
897  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
898    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
899    // use column family names having special characters
900    if (numCfs-- > 0) {
901      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
902    }
903    if (numCfs-- > 0) {
904      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
905    }
906    if (numCfs-- > 0) {
907      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
908    }
909    if (numCfs-- > 0) {
910      familyToCompression.put("Family3", Compression.Algorithm.NONE);
911    }
912    return familyToCompression;
913  }
914
915  /**
916   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
917   * family bloom type map is correctly serialized into and deserialized from configuration
918   */
919  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
920  @Test
921  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
922    for (int numCfs = 0; numCfs <= 2; numCfs++) {
923      Configuration conf = new Configuration(this.util.getConfiguration());
924      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
925      Table table = Mockito.mock(Table.class);
926      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
927      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
928        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
929          Arrays.asList(table.getDescriptor())));
930
931      // read back family specific data block encoding settings from the
932      // configuration
933      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
934        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
935
936      // test that we have a value for all column families that matches with the
937      // used mock values
938      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
939        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
940          entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
941      }
942    }
943  }
944
945  private void setupMockColumnFamiliesForBloomType(Table table,
946    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
947    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
948    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
949      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
950        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
951        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
952      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
953    }
954    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
955  }
956
957  /**
958   * @return a map from column family names to compression algorithms for testing column family
959   *         compression. Column family names have special characters
960   */
961  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
962    Map<String, BloomType> familyToBloomType = new HashMap<>();
963    // use column family names having special characters
964    if (numCfs-- > 0) {
965      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
966    }
967    if (numCfs-- > 0) {
968      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
969    }
970    if (numCfs-- > 0) {
971      familyToBloomType.put("Family3", BloomType.NONE);
972    }
973    return familyToBloomType;
974  }
975
976  /**
977   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
978   * family block size map is correctly serialized into and deserialized from configuration
979   */
980  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
981  @Test
982  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
983    for (int numCfs = 0; numCfs <= 3; numCfs++) {
984      Configuration conf = new Configuration(this.util.getConfiguration());
985      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
986      Table table = Mockito.mock(Table.class);
987      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
988      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
989        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
990          Arrays.asList(table.getDescriptor())));
991
992      // read back family specific data block encoding settings from the
993      // configuration
994      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
995        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
996
997      // test that we have a value for all column families that matches with the
998      // used mock values
999      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
1000        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
1001          entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
1002      }
1003    }
1004  }
1005
1006  private void setupMockColumnFamiliesForBlockSize(Table table,
1007    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
1008    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1009    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
1010      ColumnFamilyDescriptor columnFamilyDescriptor =
1011        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1012          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
1013      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1014    }
1015    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1016  }
1017
1018  /**
1019   * @return a map from column family names to compression algorithms for testing column family
1020   *         compression. Column family names have special characters
1021   */
1022  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
1023    Map<String, Integer> familyToBlockSize = new HashMap<>();
1024    // use column family names having special characters
1025    if (numCfs-- > 0) {
1026      familyToBlockSize.put("Family1!@#!@#&", 1234);
1027    }
1028    if (numCfs-- > 0) {
1029      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1030    }
1031    if (numCfs-- > 0) {
1032      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1033    }
1034    if (numCfs-- > 0) {
1035      familyToBlockSize.put("Family3", 0);
1036    }
1037    return familyToBlockSize;
1038  }
1039
1040  /**
1041   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
1042   * the family data block encoding map is correctly serialized into and deserialized from
1043   * configuration
1044   */
1045  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1046  @Test
1047  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1048    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1049      Configuration conf = new Configuration(this.util.getConfiguration());
1050      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1051        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1052      Table table = Mockito.mock(Table.class);
1053      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1054      TableDescriptor tableDescriptor = table.getDescriptor();
1055      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1056        HFileOutputFormat2.serializeColumnFamilyAttribute(
1057          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1058
1059      // read back family specific data block encoding settings from the
1060      // configuration
1061      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1062        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1063
1064      // test that we have a value for all column families that matches with the
1065      // used mock values
1066      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1067        assertEquals(
1068          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1069          entry.getValue(),
1070          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1071      }
1072    }
1073  }
1074
1075  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1076    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1077    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1078    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1079      ColumnFamilyDescriptor columnFamilyDescriptor =
1080        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1081          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
1082          .build();
1083      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1084    }
1085    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1086  }
1087
1088  /**
1089   * @return a map from column family names to compression algorithms for testing column family
1090   *         compression. Column family names have special characters
1091   */
1092  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1093    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1094    // use column family names having special characters
1095    if (numCfs-- > 0) {
1096      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1097    }
1098    if (numCfs-- > 0) {
1099      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1100    }
1101    if (numCfs-- > 0) {
1102      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1103    }
1104    if (numCfs-- > 0) {
1105      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1106    }
1107    return familyToDataBlockEncoding;
1108  }
1109
1110  private void setupMockStartKeys(RegionLocator table) throws IOException {
1111    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1112      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1113    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1114  }
1115
1116  private void setupMockTableName(RegionLocator table) throws IOException {
1117    TableName mockTableName = TableName.valueOf("mock_table");
1118    Mockito.doReturn(mockTableName).when(table).getName();
1119  }
1120
1121  /**
1122   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1123   * from the column family descriptor
1124   */
1125  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1126  @Test
1127  public void testColumnFamilySettings() throws Exception {
1128    Configuration conf = new Configuration(this.util.getConfiguration());
1129    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1130    TaskAttemptContext context = null;
1131    Path dir = util.getDataTestDir("testColumnFamilySettings");
1132
1133    // Setup table descriptor
1134    Table table = Mockito.mock(Table.class);
1135    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1136    TableDescriptorBuilder tableDescriptorBuilder =
1137      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1138
1139    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
1140    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
1141      tableDescriptorBuilder.setColumnFamily(hcd);
1142    }
1143
1144    // set up the table to return some mock keys
1145    setupMockStartKeys(regionLocator);
1146
1147    try {
1148      // partial map red setup to get an operational writer for testing
1149      // We turn off the sequence file compression, because DefaultCodec
1150      // pollutes the GZip codec pool with an incompatible compressor.
1151      conf.set("io.seqfile.compression.type", "NONE");
1152      conf.set("hbase.fs.tmp.dir", dir.toString());
1153      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1154      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1155
1156      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1157      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1158      setupRandomGeneratorMapper(job, false);
1159      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1160      FileOutputFormat.setOutputPath(job, dir);
1161      context = createTestTaskAttemptContext(job);
1162      HFileOutputFormat2 hof = new HFileOutputFormat2();
1163      writer = hof.getRecordWriter(context);
1164
1165      // write out random rows
1166      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
1167        ROWSPERSPLIT);
1168      writer.close(context);
1169
1170      // Make sure that a directory was created for every CF
1171      FileSystem fs = dir.getFileSystem(conf);
1172
1173      // commit so that the filesystem has one directory per column family
1174      hof.getOutputCommitter(context).commitTask(context);
1175      hof.getOutputCommitter(context).commitJob(context);
1176      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1177      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
1178      for (FileStatus f : families) {
1179        String familyStr = f.getPath().getName();
1180        ColumnFamilyDescriptor hcd =
1181          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
1182        // verify that the compression on this file matches the configured
1183        // compression
1184        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1185        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1186        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1187
1188        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1189        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1190        assertEquals(
1191          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1192          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1193        assertEquals(
1194          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1195          hcd.getCompressionType(), reader.getFileContext().getCompression());
1196      }
1197    } finally {
1198      dir.getFileSystem(conf).delete(dir, true);
1199    }
1200  }
1201
1202  /**
1203   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1204   * family descriptors
1205   */
1206  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1207    TaskAttemptContext context, Set<byte[]> families, int numRows)
1208    throws IOException, InterruptedException {
1209    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1210    int valLength = 10;
1211    byte valBytes[] = new byte[valLength];
1212
1213    int taskId = context.getTaskAttemptID().getTaskID().getId();
1214    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1215    final byte[] qualifier = Bytes.toBytes("data");
1216    for (int i = 0; i < numRows; i++) {
1217      Bytes.putInt(keyBytes, 0, i);
1218      Bytes.random(valBytes);
1219      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1220      for (byte[] family : families) {
1221        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1222        writer.write(key, kv);
1223      }
1224    }
1225  }
1226
1227  /**
1228   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1229   * excluded from minor compaction. Without the fix of HBASE-6901, an
1230   * ArrayIndexOutOfBoundsException will be thrown.
1231   */
1232  @Ignore("Flakey: See HBASE-9051")
1233  @Test
1234  public void testExcludeAllFromMinorCompaction() throws Exception {
1235    Configuration conf = util.getConfiguration();
1236    conf.setInt("hbase.hstore.compaction.min", 2);
1237    generateRandomStartKeys(5);
1238
1239    util.startMiniCluster();
1240    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1241      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1242      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1243      final FileSystem fs = util.getDFSCluster().getFileSystem();
1244      assertEquals("Should start with empty table", 0, util.countRows(table));
1245
1246      // deep inspection: get the StoreFile dir
1247      final Path storePath =
1248        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1249          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1250            Bytes.toString(FAMILIES[0])));
1251      assertEquals(0, fs.listStatus(storePath).length);
1252
1253      // Generate two bulk load files
1254      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1255
1256      for (int i = 0; i < 2; i++) {
1257        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1258        runIncrementalPELoad(conf,
1259          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
1260            conn.getRegionLocator(TABLE_NAMES[0]))),
1261          testDir, false);
1262        // Perform the actual load
1263        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1264      }
1265
1266      // Ensure data shows up
1267      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1268      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
1269        util.countRows(table));
1270
1271      // should have a second StoreFile now
1272      assertEquals(2, fs.listStatus(storePath).length);
1273
1274      // minor compactions shouldn't get rid of the file
1275      admin.compact(TABLE_NAMES[0]);
1276      try {
1277        quickPoll(new Callable<Boolean>() {
1278          @Override
1279          public Boolean call() throws Exception {
1280            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1281            for (HRegion region : regions) {
1282              for (HStore store : region.getStores()) {
1283                store.closeAndArchiveCompactedFiles();
1284              }
1285            }
1286            return fs.listStatus(storePath).length == 1;
1287          }
1288        }, 5000);
1289        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1290      } catch (AssertionError ae) {
1291        // this is expected behavior
1292      }
1293
1294      // a major compaction should work though
1295      admin.majorCompact(TABLE_NAMES[0]);
1296      quickPoll(new Callable<Boolean>() {
1297        @Override
1298        public Boolean call() throws Exception {
1299          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1300          for (HRegion region : regions) {
1301            for (HStore store : region.getStores()) {
1302              store.closeAndArchiveCompactedFiles();
1303            }
1304          }
1305          return fs.listStatus(storePath).length == 1;
1306        }
1307      }, 5000);
1308
1309    } finally {
1310      util.shutdownMiniCluster();
1311    }
1312  }
1313
1314  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1315  @Test
1316  public void testExcludeMinorCompaction() throws Exception {
1317    Configuration conf = util.getConfiguration();
1318    conf.setInt("hbase.hstore.compaction.min", 2);
1319    generateRandomStartKeys(5);
1320
1321    util.startMiniCluster();
1322    try (Connection conn = ConnectionFactory.createConnection(conf);
1323      Admin admin = conn.getAdmin()) {
1324      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1325      final FileSystem fs = util.getDFSCluster().getFileSystem();
1326      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1327      assertEquals("Should start with empty table", 0, util.countRows(table));
1328
1329      // deep inspection: get the StoreFile dir
1330      final Path storePath =
1331        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1332          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1333            Bytes.toString(FAMILIES[0])));
1334      assertEquals(0, fs.listStatus(storePath).length);
1335
1336      // put some data in it and flush to create a storefile
1337      Put p = new Put(Bytes.toBytes("test"));
1338      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1339      table.put(p);
1340      admin.flush(TABLE_NAMES[0]);
1341      assertEquals(1, util.countRows(table));
1342      quickPoll(new Callable<Boolean>() {
1343        @Override
1344        public Boolean call() throws Exception {
1345          return fs.listStatus(storePath).length == 1;
1346        }
1347      }, 5000);
1348
1349      // Generate a bulk load file with more rows
1350      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1351
1352      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1353      runIncrementalPELoad(conf,
1354        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1355        testDir, false);
1356
1357      // Perform the actual load
1358      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1359
1360      // Ensure data shows up
1361      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1362      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1,
1363        util.countRows(table));
1364
1365      // should have a second StoreFile now
1366      assertEquals(2, fs.listStatus(storePath).length);
1367
1368      // minor compactions shouldn't get rid of the file
1369      admin.compact(TABLE_NAMES[0]);
1370      try {
1371        quickPoll(new Callable<Boolean>() {
1372          @Override
1373          public Boolean call() throws Exception {
1374            return fs.listStatus(storePath).length == 1;
1375          }
1376        }, 5000);
1377        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1378      } catch (AssertionError ae) {
1379        // this is expected behavior
1380      }
1381
1382      // a major compaction should work though
1383      admin.majorCompact(TABLE_NAMES[0]);
1384      quickPoll(new Callable<Boolean>() {
1385        @Override
1386        public Boolean call() throws Exception {
1387          return fs.listStatus(storePath).length == 1;
1388        }
1389      }, 5000);
1390
1391    } finally {
1392      util.shutdownMiniCluster();
1393    }
1394  }
1395
1396  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1397    int sleepMs = 10;
1398    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1399    while (retries-- > 0) {
1400      if (c.call().booleanValue()) {
1401        return;
1402      }
1403      Thread.sleep(sleepMs);
1404    }
1405    fail();
1406  }
1407
1408  public static void main(String args[]) throws Exception {
1409    new TestHFileOutputFormat2().manualTest(args);
1410  }
1411
1412  public void manualTest(String args[]) throws Exception {
1413    Configuration conf = HBaseConfiguration.create();
1414    util = new HBaseTestingUtil(conf);
1415    if ("newtable".equals(args[0])) {
1416      TableName tname = TableName.valueOf(args[1]);
1417      byte[][] splitKeys = generateRandomSplitKeys(4);
1418      Table table = util.createTable(tname, FAMILIES, splitKeys);
1419    } else if ("incremental".equals(args[0])) {
1420      TableName tname = TableName.valueOf(args[1]);
1421      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1422        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1423        Path outDir = new Path("incremental-out");
1424        runIncrementalPELoad(conf,
1425          Arrays
1426            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
1427          outDir, false);
1428      }
1429    } else {
1430      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1431    }
1432  }
1433
1434  @Test
1435  public void testBlockStoragePolicy() throws Exception {
1436    util = new HBaseTestingUtil();
1437    Configuration conf = util.getConfiguration();
1438    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1439
1440    conf.set(
1441      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1442        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1443      "ONE_SSD");
1444    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1445    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1446    util.startMiniDFSCluster(3);
1447    FileSystem fs = util.getDFSCluster().getFileSystem();
1448    try {
1449      fs.mkdirs(cf1Dir);
1450      fs.mkdirs(cf2Dir);
1451
1452      // the original block storage policy would be HOT
1453      String spA = getStoragePolicyName(fs, cf1Dir);
1454      String spB = getStoragePolicyName(fs, cf2Dir);
1455      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1456      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1457      assertEquals("HOT", spA);
1458      assertEquals("HOT", spB);
1459
1460      // alter table cf schema to change storage policies
1461      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1462        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1463      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1464        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1465      spA = getStoragePolicyName(fs, cf1Dir);
1466      spB = getStoragePolicyName(fs, cf2Dir);
1467      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1468      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1469      assertNotNull(spA);
1470      assertEquals("ONE_SSD", spA);
1471      assertNotNull(spB);
1472      assertEquals("ALL_SSD", spB);
1473    } finally {
1474      fs.delete(cf1Dir, true);
1475      fs.delete(cf2Dir, true);
1476      util.shutdownMiniDFSCluster();
1477    }
1478  }
1479
1480  private String getStoragePolicyName(FileSystem fs, Path path) {
1481    try {
1482      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1483      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1484    } catch (Exception e) {
1485      // Maybe fail because of using old HDFS version, try the old way
1486      if (LOG.isTraceEnabled()) {
1487        LOG.trace("Failed to get policy directly", e);
1488      }
1489      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1490      return policy == null ? "HOT" : policy;// HOT by default
1491    }
1492  }
1493
1494  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1495    try {
1496      if (fs instanceof DistributedFileSystem) {
1497        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1498        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1499        if (null != status) {
1500          byte storagePolicyId = status.getStoragePolicy();
1501          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1502          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1503            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1504            for (BlockStoragePolicy policy : policies) {
1505              if (policy.getId() == storagePolicyId) {
1506                return policy.getName();
1507              }
1508            }
1509          }
1510        }
1511      }
1512    } catch (Throwable e) {
1513      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1514    }
1515
1516    return null;
1517  }
1518
1519  @Test
1520  public void TestConfigurePartitioner() throws IOException {
1521    Configuration conf = util.getConfiguration();
1522    // Create a user who is not the current user
1523    String fooUserName = "foo1234";
1524    String fooGroupName = "group1";
1525    UserGroupInformation ugi =
1526      UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName });
1527    // Get user's home directory
1528    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1529      @Override
1530      public Path run() {
1531        try (FileSystem fs = FileSystem.get(conf)) {
1532          return fs.makeQualified(fs.getHomeDirectory());
1533        } catch (IOException ioe) {
1534          LOG.error("Failed to get foo's home directory", ioe);
1535        }
1536        return null;
1537      }
1538    });
1539
1540    Job job = Mockito.mock(Job.class);
1541    Mockito.doReturn(conf).when(job).getConfiguration();
1542    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1543    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1544    splitPoints.add(writable);
1545
1546    ugi.doAs(new PrivilegedAction<Void>() {
1547      @Override
1548      public Void run() {
1549        try {
1550          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1551        } catch (IOException ioe) {
1552          LOG.error("Failed to configure partitioner", ioe);
1553        }
1554        return null;
1555      }
1556    });
1557    FileSystem fs = FileSystem.get(conf);
1558    // verify that the job uses TotalOrderPartitioner
1559    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1560    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1561    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1562    Assert.assertNotNull(partitionPathString);
1563    // Make sure the partion file is in foo1234's home directory, and that
1564    // the file exists.
1565    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1566    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1567  }
1568
1569  @Test
1570  public void TestConfigureCompression() throws Exception {
1571    Configuration conf = new Configuration(this.util.getConfiguration());
1572    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1573    TaskAttemptContext context = null;
1574    Path dir = util.getDataTestDir("TestConfigureCompression");
1575    String hfileoutputformatCompression = "gz";
1576
1577    try {
1578      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1579      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1580
1581      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1582
1583      Job job = Job.getInstance(conf);
1584      FileOutputFormat.setOutputPath(job, dir);
1585      context = createTestTaskAttemptContext(job);
1586      HFileOutputFormat2 hof = new HFileOutputFormat2();
1587      writer = hof.getRecordWriter(context);
1588      final byte[] b = Bytes.toBytes("b");
1589
1590      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1591      writer.write(new ImmutableBytesWritable(), kv);
1592      writer.close(context);
1593      writer = null;
1594      FileSystem fs = dir.getFileSystem(conf);
1595      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1596      while (iterator.hasNext()) {
1597        LocatedFileStatus keyFileStatus = iterator.next();
1598        HFile.Reader reader =
1599          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1600        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1601          hfileoutputformatCompression);
1602      }
1603    } finally {
1604      if (writer != null && context != null) {
1605        writer.close(context);
1606      }
1607      dir.getFileSystem(conf).delete(dir, true);
1608    }
1609
1610  }
1611
1612  @Test
1613  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1614    // Start cluster A
1615    util = new HBaseTestingUtil();
1616    Configuration confA = util.getConfiguration();
1617    int hostCount = 3;
1618    int regionNum = 20;
1619    String[] hostnames = new String[hostCount];
1620    for (int i = 0; i < hostCount; ++i) {
1621      hostnames[i] = "datanode_" + i;
1622    }
1623    StartTestingClusterOption option = StartTestingClusterOption.builder()
1624      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1625    util.startMiniCluster(option);
1626
1627    // Start cluster B
1628    HBaseTestingUtil utilB = new HBaseTestingUtil();
1629    Configuration confB = utilB.getConfiguration();
1630    utilB.startMiniCluster(option);
1631
1632    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1633
1634    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1635    TableName tableName = TableName.valueOf("table");
1636    // Create table in cluster B
1637    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1638      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1639      // Generate the bulk load files
1640      // Job has zookeeper configuration for cluster A
1641      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1642      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1643      Configuration jobConf = job.getConfiguration();
1644      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1645      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1646      setupRandomGeneratorMapper(job, false);
1647      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1648
1649      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1650        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1651      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1652        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1653      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1654        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1655
1656      String bSpecificConfigKey = "my.override.config.for.b";
1657      String bSpecificConfigValue = "b-specific-value";
1658      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1659        bSpecificConfigValue);
1660
1661      FileOutputFormat.setOutputPath(job, testDir);
1662
1663      assertFalse(util.getTestFileSystem().exists(testDir));
1664
1665      assertTrue(job.waitForCompletion(true));
1666
1667      final List<Configuration> configs =
1668        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1669
1670      assertFalse(configs.isEmpty());
1671      for (Configuration config : configs) {
1672        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1673          config.get(HConstants.ZOOKEEPER_QUORUM));
1674        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1675          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1676        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1677          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1678
1679        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1680      }
1681    } finally {
1682      utilB.deleteTable(tableName);
1683      testDir.getFileSystem(confA).delete(testDir, true);
1684      util.shutdownMiniCluster();
1685      utilB.shutdownMiniCluster();
1686    }
1687  }
1688
1689  private static class ConfigurationCaptorConnection implements Connection {
1690    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1691
1692    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1693
1694    private final Connection delegate;
1695
1696    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1697      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
1698      // here we do not use this registry, so close it...
1699      registry.close();
1700      // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
1701      // implementation in below method
1702      delegate =
1703        FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
1704          .toConnection();
1705
1706      final String uuid = conf.get(UUID_KEY);
1707      if (uuid != null) {
1708        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1709      }
1710    }
1711
1712    static UUID configureConnectionImpl(Configuration conf) {
1713      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1714        ConfigurationCaptorConnection.class, Connection.class);
1715
1716      final UUID uuid = UUID.randomUUID();
1717      conf.set(UUID_KEY, uuid.toString());
1718      return uuid;
1719    }
1720
1721    static List<Configuration> getCapturedConfigarutions(UUID key) {
1722      return confs.get(key);
1723    }
1724
1725    @Override
1726    public Configuration getConfiguration() {
1727      return delegate.getConfiguration();
1728    }
1729
1730    @Override
1731    public Table getTable(TableName tableName) throws IOException {
1732      return delegate.getTable(tableName);
1733    }
1734
1735    @Override
1736    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1737      return delegate.getTable(tableName, pool);
1738    }
1739
1740    @Override
1741    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1742      return delegate.getBufferedMutator(tableName);
1743    }
1744
1745    @Override
1746    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1747      return delegate.getBufferedMutator(params);
1748    }
1749
1750    @Override
1751    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1752      return delegate.getRegionLocator(tableName);
1753    }
1754
1755    @Override
1756    public void clearRegionLocationCache() {
1757      delegate.clearRegionLocationCache();
1758    }
1759
1760    @Override
1761    public Admin getAdmin() throws IOException {
1762      return delegate.getAdmin();
1763    }
1764
1765    @Override
1766    public void close() throws IOException {
1767      delegate.close();
1768    }
1769
1770    @Override
1771    public boolean isClosed() {
1772      return delegate.isClosed();
1773    }
1774
1775    @Override
1776    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1777      return delegate.getTableBuilder(tableName, pool);
1778    }
1779
1780    @Override
1781    public AsyncConnection toAsyncConnection() {
1782      return delegate.toAsyncConnection();
1783    }
1784
1785    @Override
1786    public String getClusterId() {
1787      return delegate.getClusterId();
1788    }
1789
1790    @Override
1791    public Hbck getHbck() throws IOException {
1792      return delegate.getHbck();
1793    }
1794
1795    @Override
1796    public Hbck getHbck(ServerName masterServer) throws IOException {
1797      return delegate.getHbck(masterServer);
1798    }
1799
1800    @Override
1801    public void abort(String why, Throwable e) {
1802      delegate.abort(why, e);
1803    }
1804
1805    @Override
1806    public boolean isAborted() {
1807      return delegate.isAborted();
1808    }
1809  }
1810
1811}