001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.hamcrest.Matchers.greaterThan;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertThrows;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.List;
032import java.util.Locale;
033import java.util.Map;
034import java.util.TreeMap;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.NamespaceDescriptor;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
055import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
056import org.apache.hadoop.hbase.io.hfile.CacheConfig;
057import org.apache.hadoop.hbase.io.hfile.HFile;
058import org.apache.hadoop.hbase.io.hfile.HFileScanner;
059import org.apache.hadoop.hbase.regionserver.BloomType;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.testclassification.MiscTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.HFileTestUtil;
065import org.hamcrest.MatcherAssert;
066import org.junit.AfterClass;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.rules.TestName;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076/**
077 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
078 * faster than the full MR cluster tests in TestHFileOutputFormat
079 */
080@Category({ MiscTests.class, LargeTests.class })
081public class TestLoadIncrementalHFiles {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085    HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class);
086
087  @Rule
088  public TestName tn = new TestName();
089
090  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
091  private static final byte[] FAMILY = Bytes.toBytes("myfam");
092  private static final String NAMESPACE = "bulkNS";
093
094  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
095  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
096
097  private static final byte[][] SPLIT_KEYS =
098    new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
099
100  static HBaseTestingUtility util = new HBaseTestingUtility();
101
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
105    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
106      MAX_FILES_PER_REGION_PER_FAMILY);
107    // change default behavior so that tag values are returned with normal rpcs
108    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
109      KeyValueCodecWithTags.class.getCanonicalName());
110    util.startMiniCluster();
111
112    setupNamespace();
113  }
114
115  protected static void setupNamespace() throws Exception {
116    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
117  }
118
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    util.shutdownMiniCluster();
122  }
123
124  @Test
125  public void testSimpleLoadWithMap() throws Exception {
126    runTest("testSimpleLoadWithMap", BloomType.NONE,
127      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
128        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
129      true);
130  }
131
132  /**
133   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
134   */
135  @Test
136  public void testSimpleLoad() throws Exception {
137    runTest("testSimpleLoad", BloomType.NONE,
138      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
139        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
140  }
141
142  @Test
143  public void testSimpleLoadWithFileCopy() throws Exception {
144    String testName = tn.getMethodName();
145    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
146    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
147      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
148        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
149      false, true, 2);
150  }
151
152  /**
153   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
154   */
155  @Test
156  public void testRegionCrossingLoad() throws Exception {
157    runTest("testRegionCrossingLoad", BloomType.NONE,
158      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
159        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
160  }
161
162  /**
163   * Test loading into a column family that has a ROW bloom filter.
164   */
165  @Test
166  public void testRegionCrossingRowBloom() throws Exception {
167    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
168      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
169        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
170  }
171
172  /**
173   * Test loading into a column family that has a ROWCOL bloom filter.
174   */
175  @Test
176  public void testRegionCrossingRowColBloom() throws Exception {
177    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
178      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
179        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
180  }
181
182  /**
183   * Test case that creates some regions and loads HFiles that have different region boundaries than
184   * the table pre-split.
185   */
186  @Test
187  public void testSimpleHFileSplit() throws Exception {
188    runTest("testHFileSplit", BloomType.NONE,
189      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
190        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
191      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
192        new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
193  }
194
195  /**
196   * Test case that creates some regions and loads HFiles that cross the boundaries and have
197   * different region boundaries than the table pre-split.
198   */
199  @Test
200  public void testRegionCrossingHFileSplit() throws Exception {
201    testRegionCrossingHFileSplit(BloomType.NONE);
202  }
203
204  /**
205   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
206   * filter and a different region boundaries than the table pre-split.
207   */
208  @Test
209  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
210    testRegionCrossingHFileSplit(BloomType.ROW);
211  }
212
213  /**
214   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
215   * bloom filter and a different region boundaries than the table pre-split.
216   */
217  @Test
218  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
219    testRegionCrossingHFileSplit(BloomType.ROWCOL);
220  }
221
222  @Test
223  public void testSplitALot() throws Exception {
224    runTest("testSplitALot", BloomType.NONE,
225      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
226        Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
227        Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
228        Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
229        Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
230        Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
231      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
232  }
233
234  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
235    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
236      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
237        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
238      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
239        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
240  }
241
242  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
243    return TableDescriptorBuilder.newBuilder(tableName)
244      .setColumnFamily(
245        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
246      .build();
247  }
248
249  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
250    throws Exception {
251    runTest(testName, bloomType, null, hfileRanges);
252  }
253
254  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
255    throws Exception {
256    runTest(testName, bloomType, null, hfileRanges, useMap);
257  }
258
259  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
260    byte[][][] hfileRanges) throws Exception {
261    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
262  }
263
264  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
265    byte[][][] hfileRanges, boolean useMap) throws Exception {
266    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
267    final boolean preCreateTable = tableSplitKeys != null;
268
269    // Run the test bulkloading the table to the default namespace
270    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
271    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
272      useMap, 2);
273
274    /*
275     * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
276     * -- regionDir -- familyDir -- storeFileDir
277     */
278    if (preCreateTable) {
279      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
280        3);
281    }
282
283    // Run the test bulkloading the table to the specified namespace
284    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
285    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
286      2);
287  }
288
289  private void runTest(String testName, TableName tableName, BloomType bloomType,
290    boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
291    int depth) throws Exception {
292    TableDescriptor htd = buildHTD(tableName, bloomType);
293    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
294  }
295
296  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
297    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
298    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
299    int factor) throws Exception {
300    return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
301      useMap, deleteFile, copyFiles, initRowCount, factor, 2);
302  }
303
304  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
305    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
306    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
307    int factor, int depth) throws Exception {
308    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
309    FileSystem fs = util.getTestFileSystem();
310    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
311    Path parentDir = baseDirectory;
312    if (depth == 3) {
313      assert !useMap;
314      parentDir = new Path(baseDirectory, "someRegion");
315    }
316    Path familyDir = new Path(parentDir, Bytes.toString(fam));
317
318    int hfileIdx = 0;
319    Map<byte[], List<Path>> map = null;
320    List<Path> list = null;
321    if (useMap || copyFiles) {
322      list = new ArrayList<>();
323    }
324    if (useMap) {
325      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
326      map.put(fam, list);
327    }
328    Path last = null;
329    for (byte[][] range : hfileRanges) {
330      byte[] from = range[0];
331      byte[] to = range[1];
332      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
333      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
334      if (useMap) {
335        last = path;
336        list.add(path);
337      }
338    }
339    int expectedRows = hfileIdx * factor;
340
341    TableName tableName = htd.getTableName();
342    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
343      util.getAdmin().createTable(htd, tableSplitKeys);
344    }
345
346    Configuration conf = util.getConfiguration();
347    if (copyFiles) {
348      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
349    }
350    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
351    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
352    if (depth == 3) {
353      args.add("-loadTable");
354    }
355
356    if (useMap) {
357      if (deleteFile) {
358        fs.delete(last, true);
359      }
360      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
361      if (deleteFile) {
362        expectedRows -= 1000;
363        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
364          if (item.getFilePath().getName().equals(last.getName())) {
365            fail(last + " should be missing");
366          }
367        }
368      }
369    } else {
370      loader.run(args.toArray(new String[] {}));
371    }
372
373    if (copyFiles) {
374      for (Path p : list) {
375        assertTrue(p + " should exist", fs.exists(p));
376      }
377    }
378
379    Table table = util.getConnection().getTable(tableName);
380    try {
381      assertEquals(initRowCount + expectedRows, util.countRows(table));
382    } finally {
383      table.close();
384    }
385
386    return expectedRows;
387  }
388
389  private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
390    byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
391    throws Exception {
392    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
393      useMap, true, copyFiles, 0, 1000, depth);
394
395    final TableName tableName = htd.getTableName();
396    // verify staging folder has been cleaned up
397    Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
398      HConstants.BULKLOAD_STAGING_DIR_NAME);
399    FileSystem fs = util.getTestFileSystem();
400    if (fs.exists(stagingBasePath)) {
401      FileStatus[] files = fs.listStatus(stagingBasePath);
402      for (FileStatus file : files) {
403        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
404          file.getPath().getName() != "DONOTERASE");
405      }
406    }
407
408    util.deleteTable(tableName);
409  }
410
411  /**
412   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
413   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
414   * responses.
415   */
416  @Test
417  public void testTagsSurviveBulkLoadSplit() throws Exception {
418    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
419    FileSystem fs = util.getTestFileSystem();
420    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
421    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
422    // table has these split points
423    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
424      Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
425
426    // creating an hfile that has values that span the split points.
427    byte[] from = Bytes.toBytes("ddd");
428    byte[] to = Bytes.toBytes("ooo");
429    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
430      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
431    int expectedRows = 1000;
432
433    TableName tableName = TableName.valueOf(tn.getMethodName());
434    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
435    util.getAdmin().createTable(htd, tableSplitKeys);
436
437    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
438    String[] args = { dir.toString(), tableName.toString() };
439    loader.run(args);
440
441    Table table = util.getConnection().getTable(tableName);
442    try {
443      assertEquals(expectedRows, util.countRows(table));
444      HFileTestUtil.verifyTags(table);
445    } finally {
446      table.close();
447    }
448
449    util.deleteTable(tableName);
450  }
451
452  /**
453   * Test loading into a column family that does not exist.
454   */
455  @Test
456  public void testNonexistentColumnFamilyLoad() throws Exception {
457    String testName = tn.getMethodName();
458    byte[][][] hFileRanges =
459      new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
460        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
461
462    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
463    // set real family name to upper case in purpose to simulate the case that
464    // family name in HFiles is invalid
465    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
466      .setColumnFamily(ColumnFamilyDescriptorBuilder
467        .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
468      .build();
469
470    try {
471      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
472      assertTrue("Loading into table with non-existent family should have failed", false);
473    } catch (Exception e) {
474      assertTrue("IOException expected", e instanceof IOException);
475      // further check whether the exception message is correct
476      String errMsg = e.getMessage();
477      assertTrue(
478        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY
479          + "], current message: [" + errMsg + "]",
480        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
481    }
482  }
483
484  @Test
485  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
486    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
487  }
488
489  @Test
490  public void testNonHfileFolder() throws Exception {
491    testNonHfileFolder("testNonHfileFolder", false);
492  }
493
494  /**
495   * Write a random data file and a non-file in a dir with a valid family name but not part of the
496   * table families. we should we able to bulkload without getting the unmatched family exception.
497   * HBASE-13037/HBASE-13227
498   */
499  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
500    Path dir = util.getDataTestDirOnTestFS(tableName);
501    FileSystem fs = util.getTestFileSystem();
502    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
503
504    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
505    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
506      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
507    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
508
509    final String NON_FAMILY_FOLDER = "_logs";
510    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
511    fs.mkdirs(nonFamilyDir);
512    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
513    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
514
515    Table table = null;
516    try {
517      if (preCreateTable) {
518        table = util.createTable(TableName.valueOf(tableName), FAMILY);
519      } else {
520        table = util.getConnection().getTable(TableName.valueOf(tableName));
521      }
522
523      final String[] args = { dir.toString(), tableName };
524      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
525      assertEquals(500, util.countRows(table));
526    } finally {
527      if (table != null) {
528        table.close();
529      }
530      fs.delete(dir, true);
531    }
532  }
533
534  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
535    FSDataOutputStream stream = fs.create(path);
536    try {
537      byte[] data = new byte[1024];
538      for (int i = 0; i < data.length; ++i) {
539        data[i] = (byte) (i & 0xff);
540      }
541      while (size >= data.length) {
542        stream.write(data, 0, data.length);
543        size -= data.length;
544      }
545      if (size > 0) {
546        stream.write(data, 0, size);
547      }
548    } finally {
549      stream.close();
550    }
551  }
552
553  @Test
554  public void testSplitStoreFile() throws IOException {
555    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
556    FileSystem fs = util.getTestFileSystem();
557    Path testIn = new Path(dir, "testhfile");
558    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
559    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
560      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
561
562    Path bottomOut = new Path(dir, "bottom.out");
563    Path topOut = new Path(dir, "top.out");
564
565    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
566      Bytes.toBytes("ggg"), bottomOut, topOut);
567
568    int rowCount = verifyHFile(bottomOut);
569    rowCount += verifyHFile(topOut);
570    assertEquals(1000, rowCount);
571  }
572
573  /**
574   * This method tests that the create_time property of the HFile produced by the splitstorefile
575   * method is greater than 0 HBASE-27688
576   */
577  @Test
578  public void testSplitStoreFileWithCreateTimeTS() throws IOException {
579    Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
580    FileSystem fs = util.getTestFileSystem();
581    Path testIn = new Path(dir, "testhfile");
582    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
583    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
584      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
585
586    Path bottomOut = new Path(dir, "bottom.out");
587    Path topOut = new Path(dir, "top.out");
588
589    BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
590      Bytes.toBytes("ggg"), bottomOut, topOut);
591
592    verifyHFileCreateTimeTS(bottomOut);
593    verifyHFileCreateTimeTS(topOut);
594  }
595
596  private void verifyHFileCreateTimeTS(Path p) throws IOException {
597    Configuration conf = util.getConfiguration();
598
599    try (HFile.Reader reader =
600      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) {
601      long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime();
602      MatcherAssert.assertThat(fileCreateTime, greaterThan(0L));
603    }
604  }
605
606  @Test
607  public void testSplitStoreFileWithNoneToNone() throws IOException {
608    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
609  }
610
611  @Test
612  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
613    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
614  }
615
616  @Test
617  public void testSplitStoreFileWithEncodedToNone() throws IOException {
618    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
619  }
620
621  @Test
622  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
623    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
624  }
625
626  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
627    DataBlockEncoding cfEncoding) throws IOException {
628    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
629    FileSystem fs = util.getTestFileSystem();
630    Path testIn = new Path(dir, "testhfile");
631    ColumnFamilyDescriptor familyDesc =
632      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
633    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
634      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
635
636    Path bottomOut = new Path(dir, "bottom.out");
637    Path topOut = new Path(dir, "top.out");
638
639    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
640      Bytes.toBytes("ggg"), bottomOut, topOut);
641
642    int rowCount = verifyHFile(bottomOut);
643    rowCount += verifyHFile(topOut);
644    assertEquals(1000, rowCount);
645  }
646
647  private int verifyHFile(Path p) throws IOException {
648    Configuration conf = util.getConfiguration();
649    HFile.Reader reader =
650      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
651    HFileScanner scanner = reader.getScanner(conf, false, false);
652    scanner.seekTo();
653    int count = 0;
654    do {
655      count++;
656    } while (scanner.next());
657    assertTrue(count > 0);
658    reader.close();
659    return count;
660  }
661
662  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
663    Integer value = map.containsKey(first) ? map.get(first) : 0;
664    map.put(first, value + 1);
665
666    value = map.containsKey(last) ? map.get(last) : 0;
667    map.put(last, value - 1);
668  }
669
670  @Test
671  public void testInferBoundaries() {
672    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
673
674    /*
675     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
676     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
677     * u---------x The output should be (m,r,u)
678     */
679
680    String first;
681    String last;
682
683    first = "a";
684    last = "e";
685    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
686
687    first = "r";
688    last = "s";
689    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
690
691    first = "o";
692    last = "p";
693    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
694
695    first = "g";
696    last = "k";
697    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
698
699    first = "v";
700    last = "x";
701    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
702
703    first = "c";
704    last = "i";
705    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
706
707    first = "m";
708    last = "q";
709    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
710
711    first = "s";
712    last = "t";
713    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
714
715    first = "u";
716    last = "w";
717    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
718
719    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
720    byte[][] compare = new byte[3][];
721    compare[0] = "m".getBytes();
722    compare[1] = "r".getBytes();
723    compare[2] = "u".getBytes();
724
725    assertEquals(3, keysArray.length);
726
727    for (int row = 0; row < keysArray.length; row++) {
728      assertArrayEquals(keysArray[row], compare[row]);
729    }
730  }
731
732  @Test
733  public void testLoadTooMayHFiles() throws Exception {
734    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
735    FileSystem fs = util.getTestFileSystem();
736    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
737    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
738
739    byte[] from = Bytes.toBytes("begin");
740    byte[] to = Bytes.toBytes("end");
741    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
742      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
743        FAMILY, QUALIFIER, from, to, 1000);
744    }
745
746    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
747    String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
748    try {
749      loader.run(args);
750      fail("Bulk loading too many files should fail");
751    } catch (IOException ie) {
752      assertTrue(ie.getMessage()
753        .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
754    }
755  }
756
757  @Test(expected = TableNotFoundException.class)
758  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
759    Configuration conf = util.getConfiguration();
760    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
761    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
762    String[] args = { "directory", "nonExistingTable" };
763    loader.run(args);
764  }
765
766  @Test
767  public void testTableWithCFNameStartWithUnderScore() throws Exception {
768    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
769    FileSystem fs = util.getTestFileSystem();
770    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
771    String family = "_cf";
772    Path familyDir = new Path(dir, family);
773
774    byte[] from = Bytes.toBytes("begin");
775    byte[] to = Bytes.toBytes("end");
776    Configuration conf = util.getConfiguration();
777    String tableName = tn.getMethodName();
778    Table table = util.createTable(TableName.valueOf(tableName), family);
779    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
780      QUALIFIER, from, to, 1000);
781
782    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
783    String[] args = { dir.toString(), tableName };
784    try {
785      loader.run(args);
786      assertEquals(1000, util.countRows(table));
787    } finally {
788      if (null != table) {
789        table.close();
790      }
791    }
792  }
793
794  @Test
795  public void testBulkLoadByFamily() throws Exception {
796    Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily");
797    FileSystem fs = util.getTestFileSystem();
798    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
799    String tableName = tn.getMethodName();
800    String[] families = { "cf1", "cf2", "cf3" };
801    for (int i = 0; i < families.length; i++) {
802      byte[] from = Bytes.toBytes(i + "begin");
803      byte[] to = Bytes.toBytes(i + "end");
804      Path familyDir = new Path(dir, families[i]);
805      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"),
806        Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000);
807    }
808    Table table = util.createTable(TableName.valueOf(tableName), families);
809    final AtomicInteger attmptedCalls = new AtomicInteger();
810    util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true);
811    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
812      @Override
813      protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, TableName tableName,
814        final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) throws IOException {
815        attmptedCalls.incrementAndGet();
816        return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile);
817      }
818    };
819
820    String[] args = { dir.toString(), tableName };
821    try {
822      loader.run(args);
823      assertEquals(families.length, attmptedCalls.get());
824      assertEquals(1000 * families.length, util.countRows(table));
825    } finally {
826      if (null != table) {
827        table.close();
828      }
829      util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false);
830    }
831  }
832
833  @Test
834  public void testFailIfNeedSplitHFile() throws IOException {
835    TableName tableName = TableName.valueOf(tn.getMethodName());
836    Table table = util.createTable(tableName, FAMILY);
837
838    util.loadTable(table, FAMILY);
839
840    FileSystem fs = util.getTestFileSystem();
841    Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
842    HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
843      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
844
845    util.getAdmin().split(tableName);
846    util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
847
848    Configuration config = new Configuration(util.getConfiguration());
849    config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
850    BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
851
852    String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
853    assertThrows(IOException.class, () -> tool.run(args));
854    util.getHBaseCluster().getRegions(tableName)
855      .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
856  }
857}