001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.hamcrest.Matchers.greaterThan; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertThrows; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.List; 032import java.util.Locale; 033import java.util.Map; 034import java.util.TreeMap; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.NamespaceDescriptor; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.TableNotFoundException; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 055import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 056import org.apache.hadoop.hbase.io.hfile.CacheConfig; 057import org.apache.hadoop.hbase.io.hfile.HFile; 058import org.apache.hadoop.hbase.io.hfile.HFileScanner; 059import org.apache.hadoop.hbase.regionserver.BloomType; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.testclassification.MiscTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.HFileTestUtil; 065import org.hamcrest.MatcherAssert; 066import org.junit.AfterClass; 067import org.junit.BeforeClass; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.rules.TestName; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 075 076/** 077 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 078 * faster than the full MR cluster tests in TestHFileOutputFormat 079 */ 080@Category({ MiscTests.class, LargeTests.class }) 081public class TestLoadIncrementalHFiles { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class); 086 087 @Rule 088 public TestName tn = new TestName(); 089 090 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 091 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 092 private static final String NAMESPACE = "bulkNS"; 093 094 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 095 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 096 097 private static final byte[][] SPLIT_KEYS = 098 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 099 100 static HBaseTestingUtility util = new HBaseTestingUtility(); 101 102 @BeforeClass 103 public static void setUpBeforeClass() throws Exception { 104 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 105 util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 106 MAX_FILES_PER_REGION_PER_FAMILY); 107 // change default behavior so that tag values are returned with normal rpcs 108 util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 109 KeyValueCodecWithTags.class.getCanonicalName()); 110 util.startMiniCluster(); 111 112 setupNamespace(); 113 } 114 115 protected static void setupNamespace() throws Exception { 116 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 117 } 118 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 util.shutdownMiniCluster(); 122 } 123 124 @Test 125 public void testSimpleLoadWithMap() throws Exception { 126 runTest("testSimpleLoadWithMap", BloomType.NONE, 127 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 128 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 129 true); 130 } 131 132 /** 133 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 134 */ 135 @Test 136 public void testSimpleLoad() throws Exception { 137 runTest("testSimpleLoad", BloomType.NONE, 138 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 139 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 140 } 141 142 @Test 143 public void testSimpleLoadWithFileCopy() throws Exception { 144 String testName = tn.getMethodName(); 145 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 146 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null, 147 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 148 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 149 false, true, 2); 150 } 151 152 /** 153 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 154 */ 155 @Test 156 public void testRegionCrossingLoad() throws Exception { 157 runTest("testRegionCrossingLoad", BloomType.NONE, 158 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 159 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 160 } 161 162 /** 163 * Test loading into a column family that has a ROW bloom filter. 164 */ 165 @Test 166 public void testRegionCrossingRowBloom() throws Exception { 167 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 168 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 169 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 170 } 171 172 /** 173 * Test loading into a column family that has a ROWCOL bloom filter. 174 */ 175 @Test 176 public void testRegionCrossingRowColBloom() throws Exception { 177 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 178 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 179 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 180 } 181 182 /** 183 * Test case that creates some regions and loads HFiles that have different region boundaries than 184 * the table pre-split. 185 */ 186 @Test 187 public void testSimpleHFileSplit() throws Exception { 188 runTest("testHFileSplit", BloomType.NONE, 189 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 190 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 191 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 192 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 193 } 194 195 /** 196 * Test case that creates some regions and loads HFiles that cross the boundaries and have 197 * different region boundaries than the table pre-split. 198 */ 199 @Test 200 public void testRegionCrossingHFileSplit() throws Exception { 201 testRegionCrossingHFileSplit(BloomType.NONE); 202 } 203 204 /** 205 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 206 * filter and a different region boundaries than the table pre-split. 207 */ 208 @Test 209 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 210 testRegionCrossingHFileSplit(BloomType.ROW); 211 } 212 213 /** 214 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 215 * bloom filter and a different region boundaries than the table pre-split. 216 */ 217 @Test 218 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 219 testRegionCrossingHFileSplit(BloomType.ROWCOL); 220 } 221 222 @Test 223 public void testSplitALot() throws Exception { 224 runTest("testSplitALot", BloomType.NONE, 225 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 226 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 227 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 228 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 229 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 230 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 231 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 232 } 233 234 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 235 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 236 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 237 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 238 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 239 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 240 } 241 242 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 243 return TableDescriptorBuilder.newBuilder(tableName) 244 .setColumnFamily( 245 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 246 .build(); 247 } 248 249 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 250 throws Exception { 251 runTest(testName, bloomType, null, hfileRanges); 252 } 253 254 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 255 throws Exception { 256 runTest(testName, bloomType, null, hfileRanges, useMap); 257 } 258 259 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 260 byte[][][] hfileRanges) throws Exception { 261 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 262 } 263 264 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 265 byte[][][] hfileRanges, boolean useMap) throws Exception { 266 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 267 final boolean preCreateTable = tableSplitKeys != null; 268 269 // Run the test bulkloading the table to the default namespace 270 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 271 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 272 useMap, 2); 273 274 /* 275 * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory 276 * -- regionDir -- familyDir -- storeFileDir 277 */ 278 if (preCreateTable) { 279 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false, 280 3); 281 } 282 283 // Run the test bulkloading the table to the specified namespace 284 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 285 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, 286 2); 287 } 288 289 private void runTest(String testName, TableName tableName, BloomType bloomType, 290 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 291 int depth) throws Exception { 292 TableDescriptor htd = buildHTD(tableName, bloomType); 293 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 294 } 295 296 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 297 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 298 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 299 int factor) throws Exception { 300 return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, 301 useMap, deleteFile, copyFiles, initRowCount, factor, 2); 302 } 303 304 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 305 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 306 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 307 int factor, int depth) throws Exception { 308 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 309 FileSystem fs = util.getTestFileSystem(); 310 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 311 Path parentDir = baseDirectory; 312 if (depth == 3) { 313 assert !useMap; 314 parentDir = new Path(baseDirectory, "someRegion"); 315 } 316 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 317 318 int hfileIdx = 0; 319 Map<byte[], List<Path>> map = null; 320 List<Path> list = null; 321 if (useMap || copyFiles) { 322 list = new ArrayList<>(); 323 } 324 if (useMap) { 325 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 326 map.put(fam, list); 327 } 328 Path last = null; 329 for (byte[][] range : hfileRanges) { 330 byte[] from = range[0]; 331 byte[] to = range[1]; 332 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 333 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 334 if (useMap) { 335 last = path; 336 list.add(path); 337 } 338 } 339 int expectedRows = hfileIdx * factor; 340 341 TableName tableName = htd.getTableName(); 342 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 343 util.getAdmin().createTable(htd, tableSplitKeys); 344 } 345 346 Configuration conf = util.getConfiguration(); 347 if (copyFiles) { 348 conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); 349 } 350 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 351 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 352 if (depth == 3) { 353 args.add("-loadTable"); 354 } 355 356 if (useMap) { 357 if (deleteFile) { 358 fs.delete(last, true); 359 } 360 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 361 if (deleteFile) { 362 expectedRows -= 1000; 363 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 364 if (item.getFilePath().getName().equals(last.getName())) { 365 fail(last + " should be missing"); 366 } 367 } 368 } 369 } else { 370 loader.run(args.toArray(new String[] {})); 371 } 372 373 if (copyFiles) { 374 for (Path p : list) { 375 assertTrue(p + " should exist", fs.exists(p)); 376 } 377 } 378 379 Table table = util.getConnection().getTable(tableName); 380 try { 381 assertEquals(initRowCount + expectedRows, util.countRows(table)); 382 } finally { 383 table.close(); 384 } 385 386 return expectedRows; 387 } 388 389 private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, 390 byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth) 391 throws Exception { 392 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 393 useMap, true, copyFiles, 0, 1000, depth); 394 395 final TableName tableName = htd.getTableName(); 396 // verify staging folder has been cleaned up 397 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 398 HConstants.BULKLOAD_STAGING_DIR_NAME); 399 FileSystem fs = util.getTestFileSystem(); 400 if (fs.exists(stagingBasePath)) { 401 FileStatus[] files = fs.listStatus(stagingBasePath); 402 for (FileStatus file : files) { 403 assertTrue("Folder=" + file.getPath() + " is not cleaned up.", 404 file.getPath().getName() != "DONOTERASE"); 405 } 406 } 407 408 util.deleteTable(tableName); 409 } 410 411 /** 412 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 413 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 414 * responses. 415 */ 416 @Test 417 public void testTagsSurviveBulkLoadSplit() throws Exception { 418 Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); 419 FileSystem fs = util.getTestFileSystem(); 420 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 421 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 422 // table has these split points 423 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 424 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 425 426 // creating an hfile that has values that span the split points. 427 byte[] from = Bytes.toBytes("ddd"); 428 byte[] to = Bytes.toBytes("ooo"); 429 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 430 new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 431 int expectedRows = 1000; 432 433 TableName tableName = TableName.valueOf(tn.getMethodName()); 434 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 435 util.getAdmin().createTable(htd, tableSplitKeys); 436 437 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 438 String[] args = { dir.toString(), tableName.toString() }; 439 loader.run(args); 440 441 Table table = util.getConnection().getTable(tableName); 442 try { 443 assertEquals(expectedRows, util.countRows(table)); 444 HFileTestUtil.verifyTags(table); 445 } finally { 446 table.close(); 447 } 448 449 util.deleteTable(tableName); 450 } 451 452 /** 453 * Test loading into a column family that does not exist. 454 */ 455 @Test 456 public void testNonexistentColumnFamilyLoad() throws Exception { 457 String testName = tn.getMethodName(); 458 byte[][][] hFileRanges = 459 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 460 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 461 462 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 463 // set real family name to upper case in purpose to simulate the case that 464 // family name in HFiles is invalid 465 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 466 .setColumnFamily(ColumnFamilyDescriptorBuilder 467 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 468 .build(); 469 470 try { 471 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 472 assertTrue("Loading into table with non-existent family should have failed", false); 473 } catch (Exception e) { 474 assertTrue("IOException expected", e instanceof IOException); 475 // further check whether the exception message is correct 476 String errMsg = e.getMessage(); 477 assertTrue( 478 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY 479 + "], current message: [" + errMsg + "]", 480 errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); 481 } 482 } 483 484 @Test 485 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 486 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 487 } 488 489 @Test 490 public void testNonHfileFolder() throws Exception { 491 testNonHfileFolder("testNonHfileFolder", false); 492 } 493 494 /** 495 * Write a random data file and a non-file in a dir with a valid family name but not part of the 496 * table families. we should we able to bulkload without getting the unmatched family exception. 497 * HBASE-13037/HBASE-13227 498 */ 499 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 500 Path dir = util.getDataTestDirOnTestFS(tableName); 501 FileSystem fs = util.getTestFileSystem(); 502 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 503 504 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 505 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 506 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 507 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 508 509 final String NON_FAMILY_FOLDER = "_logs"; 510 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 511 fs.mkdirs(nonFamilyDir); 512 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 513 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 514 515 Table table = null; 516 try { 517 if (preCreateTable) { 518 table = util.createTable(TableName.valueOf(tableName), FAMILY); 519 } else { 520 table = util.getConnection().getTable(TableName.valueOf(tableName)); 521 } 522 523 final String[] args = { dir.toString(), tableName }; 524 new LoadIncrementalHFiles(util.getConfiguration()).run(args); 525 assertEquals(500, util.countRows(table)); 526 } finally { 527 if (table != null) { 528 table.close(); 529 } 530 fs.delete(dir, true); 531 } 532 } 533 534 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 535 FSDataOutputStream stream = fs.create(path); 536 try { 537 byte[] data = new byte[1024]; 538 for (int i = 0; i < data.length; ++i) { 539 data[i] = (byte) (i & 0xff); 540 } 541 while (size >= data.length) { 542 stream.write(data, 0, data.length); 543 size -= data.length; 544 } 545 if (size > 0) { 546 stream.write(data, 0, size); 547 } 548 } finally { 549 stream.close(); 550 } 551 } 552 553 @Test 554 public void testSplitStoreFile() throws IOException { 555 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 556 FileSystem fs = util.getTestFileSystem(); 557 Path testIn = new Path(dir, "testhfile"); 558 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 559 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 560 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 561 562 Path bottomOut = new Path(dir, "bottom.out"); 563 Path topOut = new Path(dir, "top.out"); 564 565 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 566 Bytes.toBytes("ggg"), bottomOut, topOut); 567 568 int rowCount = verifyHFile(bottomOut); 569 rowCount += verifyHFile(topOut); 570 assertEquals(1000, rowCount); 571 } 572 573 /** 574 * This method tests that the create_time property of the HFile produced by the splitstorefile 575 * method is greater than 0 HBASE-27688 576 */ 577 @Test 578 public void testSplitStoreFileWithCreateTimeTS() throws IOException { 579 Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); 580 FileSystem fs = util.getTestFileSystem(); 581 Path testIn = new Path(dir, "testhfile"); 582 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 583 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 584 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 585 586 Path bottomOut = new Path(dir, "bottom.out"); 587 Path topOut = new Path(dir, "top.out"); 588 589 BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 590 Bytes.toBytes("ggg"), bottomOut, topOut); 591 592 verifyHFileCreateTimeTS(bottomOut); 593 verifyHFileCreateTimeTS(topOut); 594 } 595 596 private void verifyHFileCreateTimeTS(Path p) throws IOException { 597 Configuration conf = util.getConfiguration(); 598 599 try (HFile.Reader reader = 600 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) { 601 long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime(); 602 MatcherAssert.assertThat(fileCreateTime, greaterThan(0L)); 603 } 604 } 605 606 @Test 607 public void testSplitStoreFileWithNoneToNone() throws IOException { 608 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); 609 } 610 611 @Test 612 public void testSplitStoreFileWithEncodedToEncoded() throws IOException { 613 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); 614 } 615 616 @Test 617 public void testSplitStoreFileWithEncodedToNone() throws IOException { 618 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); 619 } 620 621 @Test 622 public void testSplitStoreFileWithNoneToEncoded() throws IOException { 623 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); 624 } 625 626 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 627 DataBlockEncoding cfEncoding) throws IOException { 628 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 629 FileSystem fs = util.getTestFileSystem(); 630 Path testIn = new Path(dir, "testhfile"); 631 ColumnFamilyDescriptor familyDesc = 632 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 633 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 634 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 635 636 Path bottomOut = new Path(dir, "bottom.out"); 637 Path topOut = new Path(dir, "top.out"); 638 639 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 640 Bytes.toBytes("ggg"), bottomOut, topOut); 641 642 int rowCount = verifyHFile(bottomOut); 643 rowCount += verifyHFile(topOut); 644 assertEquals(1000, rowCount); 645 } 646 647 private int verifyHFile(Path p) throws IOException { 648 Configuration conf = util.getConfiguration(); 649 HFile.Reader reader = 650 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 651 HFileScanner scanner = reader.getScanner(conf, false, false); 652 scanner.seekTo(); 653 int count = 0; 654 do { 655 count++; 656 } while (scanner.next()); 657 assertTrue(count > 0); 658 reader.close(); 659 return count; 660 } 661 662 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 663 Integer value = map.containsKey(first) ? map.get(first) : 0; 664 map.put(first, value + 1); 665 666 value = map.containsKey(last) ? map.get(last) : 0; 667 map.put(last, value - 1); 668 } 669 670 @Test 671 public void testInferBoundaries() { 672 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 673 674 /* 675 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 676 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 677 * u---------x The output should be (m,r,u) 678 */ 679 680 String first; 681 String last; 682 683 first = "a"; 684 last = "e"; 685 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 686 687 first = "r"; 688 last = "s"; 689 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 690 691 first = "o"; 692 last = "p"; 693 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 694 695 first = "g"; 696 last = "k"; 697 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 698 699 first = "v"; 700 last = "x"; 701 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 702 703 first = "c"; 704 last = "i"; 705 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 706 707 first = "m"; 708 last = "q"; 709 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 710 711 first = "s"; 712 last = "t"; 713 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 714 715 first = "u"; 716 last = "w"; 717 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 718 719 byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); 720 byte[][] compare = new byte[3][]; 721 compare[0] = "m".getBytes(); 722 compare[1] = "r".getBytes(); 723 compare[2] = "u".getBytes(); 724 725 assertEquals(3, keysArray.length); 726 727 for (int row = 0; row < keysArray.length; row++) { 728 assertArrayEquals(keysArray[row], compare[row]); 729 } 730 } 731 732 @Test 733 public void testLoadTooMayHFiles() throws Exception { 734 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 735 FileSystem fs = util.getTestFileSystem(); 736 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 737 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 738 739 byte[] from = Bytes.toBytes("begin"); 740 byte[] to = Bytes.toBytes("end"); 741 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 742 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 743 FAMILY, QUALIFIER, from, to, 1000); 744 } 745 746 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 747 String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" }; 748 try { 749 loader.run(args); 750 fail("Bulk loading too many files should fail"); 751 } catch (IOException ie) { 752 assertTrue(ie.getMessage() 753 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 754 } 755 } 756 757 @Test(expected = TableNotFoundException.class) 758 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 759 Configuration conf = util.getConfiguration(); 760 conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 761 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 762 String[] args = { "directory", "nonExistingTable" }; 763 loader.run(args); 764 } 765 766 @Test 767 public void testTableWithCFNameStartWithUnderScore() throws Exception { 768 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 769 FileSystem fs = util.getTestFileSystem(); 770 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 771 String family = "_cf"; 772 Path familyDir = new Path(dir, family); 773 774 byte[] from = Bytes.toBytes("begin"); 775 byte[] to = Bytes.toBytes("end"); 776 Configuration conf = util.getConfiguration(); 777 String tableName = tn.getMethodName(); 778 Table table = util.createTable(TableName.valueOf(tableName), family); 779 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 780 QUALIFIER, from, to, 1000); 781 782 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 783 String[] args = { dir.toString(), tableName }; 784 try { 785 loader.run(args); 786 assertEquals(1000, util.countRows(table)); 787 } finally { 788 if (null != table) { 789 table.close(); 790 } 791 } 792 } 793 794 @Test 795 public void testBulkLoadByFamily() throws Exception { 796 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 797 FileSystem fs = util.getTestFileSystem(); 798 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 799 String tableName = tn.getMethodName(); 800 String[] families = { "cf1", "cf2", "cf3" }; 801 for (int i = 0; i < families.length; i++) { 802 byte[] from = Bytes.toBytes(i + "begin"); 803 byte[] to = Bytes.toBytes(i + "end"); 804 Path familyDir = new Path(dir, families[i]); 805 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 806 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 807 } 808 Table table = util.createTable(TableName.valueOf(tableName), families); 809 final AtomicInteger attmptedCalls = new AtomicInteger(); 810 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true); 811 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 812 @Override 813 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, TableName tableName, 814 final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) throws IOException { 815 attmptedCalls.incrementAndGet(); 816 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile); 817 } 818 }; 819 820 String[] args = { dir.toString(), tableName }; 821 try { 822 loader.run(args); 823 assertEquals(families.length, attmptedCalls.get()); 824 assertEquals(1000 * families.length, util.countRows(table)); 825 } finally { 826 if (null != table) { 827 table.close(); 828 } 829 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 830 } 831 } 832 833 @Test 834 public void testFailIfNeedSplitHFile() throws IOException { 835 TableName tableName = TableName.valueOf(tn.getMethodName()); 836 Table table = util.createTable(tableName, FAMILY); 837 838 util.loadTable(table, FAMILY); 839 840 FileSystem fs = util.getTestFileSystem(); 841 Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file")); 842 HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER, 843 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 844 845 util.getAdmin().split(tableName); 846 util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1); 847 848 Configuration config = new Configuration(util.getConfiguration()); 849 config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true); 850 BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config); 851 852 String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() }; 853 assertThrows(IOException.class, () -> tool.run(args)); 854 util.getHBaseCluster().getRegions(tableName) 855 .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size())); 856 } 857}