001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertThrows; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.net.InetAddress; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.TreeMap; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtil; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.NamespaceDescriptor; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.TableNotFoundException; 052import org.apache.hadoop.hbase.client.AsyncClusterConnection; 053import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 062import org.apache.hadoop.hbase.io.hfile.CacheConfig; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.io.hfile.HFileScanner; 065import org.apache.hadoop.hbase.regionserver.BloomType; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.MiscTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.FutureUtils; 071import org.apache.hadoop.hbase.util.HFileTestUtil; 072import org.apache.hadoop.hdfs.DistributedFileSystem; 073import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 074import org.apache.hadoop.hdfs.protocol.LocatedBlock; 075import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 076import org.hamcrest.MatcherAssert; 077import org.junit.AfterClass; 078import org.junit.BeforeClass; 079import org.junit.ClassRule; 080import org.junit.Rule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.rules.TestName; 084 085import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 086 087/** 088 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 089 * faster than the full MR cluster tests in TestHFileOutputFormat 090 */ 091@Category({ MiscTests.class, LargeTests.class }) 092public class TestBulkLoadHFiles { 093 094 @ClassRule 095 public static final HBaseClassTestRule CLASS_RULE = 096 HBaseClassTestRule.forClass(TestBulkLoadHFiles.class); 097 098 @Rule 099 public TestName tn = new TestName(); 100 101 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 102 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 103 private static final String NAMESPACE = "bulkNS"; 104 105 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 106 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 107 108 private static final byte[][] SPLIT_KEYS = 109 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 110 111 static HBaseTestingUtil util = new HBaseTestingUtil(); 112 113 @BeforeClass 114 public static void setUpBeforeClass() throws Exception { 115 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 116 util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 117 MAX_FILES_PER_REGION_PER_FAMILY); 118 // change default behavior so that tag values are returned with normal rpcs 119 util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 120 KeyValueCodecWithTags.class.getCanonicalName()); 121 util.startMiniCluster(); 122 123 setupNamespace(); 124 } 125 126 protected static void setupNamespace() throws Exception { 127 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 128 } 129 130 @AfterClass 131 public static void tearDownAfterClass() throws Exception { 132 util.shutdownMiniCluster(); 133 } 134 135 @Test 136 public void testSimpleLoadWithMap() throws Exception { 137 runTest("testSimpleLoadWithMap", BloomType.NONE, 138 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 139 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 140 true); 141 } 142 143 /** 144 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 145 */ 146 @Test 147 public void testSimpleLoad() throws Exception { 148 runTest("testSimpleLoad", BloomType.NONE, 149 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 150 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 151 } 152 153 @Test 154 public void testSimpleLoadWithFileCopy() throws Exception { 155 String testName = tn.getMethodName(); 156 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 157 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null, 158 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 159 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 160 false, true, 2); 161 } 162 163 /** 164 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 165 */ 166 @Test 167 public void testRegionCrossingLoad() throws Exception { 168 runTest("testRegionCrossingLoad", BloomType.NONE, 169 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 170 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 171 } 172 173 /** 174 * Test loading into a column family that has a ROW bloom filter. 175 */ 176 @Test 177 public void testRegionCrossingRowBloom() throws Exception { 178 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 179 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 180 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 181 } 182 183 /** 184 * Test loading into a column family that has a ROWCOL bloom filter. 185 */ 186 @Test 187 public void testRegionCrossingRowColBloom() throws Exception { 188 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 189 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 190 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 191 } 192 193 /** 194 * Test case that creates some regions and loads HFiles that have different region boundaries than 195 * the table pre-split. 196 */ 197 @Test 198 public void testSimpleHFileSplit() throws Exception { 199 runTest("testHFileSplit", BloomType.NONE, 200 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 201 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 202 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 203 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 204 } 205 206 /** 207 * Test case that creates some regions and loads HFiles that cross the boundaries and have 208 * different region boundaries than the table pre-split. 209 */ 210 @Test 211 public void testRegionCrossingHFileSplit() throws Exception { 212 testRegionCrossingHFileSplit(BloomType.NONE); 213 } 214 215 /** 216 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 217 * filter and a different region boundaries than the table pre-split. 218 */ 219 @Test 220 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 221 testRegionCrossingHFileSplit(BloomType.ROW); 222 } 223 224 /** 225 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 226 * bloom filter and a different region boundaries than the table pre-split. 227 */ 228 @Test 229 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 230 testRegionCrossingHFileSplit(BloomType.ROWCOL); 231 } 232 233 @Test 234 public void testSplitALot() throws Exception { 235 runTest("testSplitALot", BloomType.NONE, 236 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 237 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 238 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 239 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 240 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 241 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 242 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 243 } 244 245 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 246 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 247 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 248 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 249 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 250 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 251 } 252 253 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 254 return TableDescriptorBuilder.newBuilder(tableName) 255 .setColumnFamily( 256 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 257 .build(); 258 } 259 260 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 261 throws Exception { 262 runTest(testName, bloomType, null, hfileRanges); 263 } 264 265 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 266 throws Exception { 267 runTest(testName, bloomType, null, hfileRanges, useMap); 268 } 269 270 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 271 byte[][][] hfileRanges) throws Exception { 272 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 273 } 274 275 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 276 byte[][][] hfileRanges, boolean useMap) throws Exception { 277 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 278 final boolean preCreateTable = tableSplitKeys != null; 279 280 // Run the test bulkloading the table to the default namespace 281 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 282 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 283 useMap, 2); 284 285 /* 286 * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory 287 * -- regionDir -- familyDir -- storeFileDir 288 */ 289 if (preCreateTable) { 290 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false, 291 3); 292 } 293 294 // Run the test bulkloading the table to the specified namespace 295 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 296 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, 297 2); 298 } 299 300 private void runTest(String testName, TableName tableName, BloomType bloomType, 301 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 302 int depth) throws Exception { 303 TableDescriptor htd = buildHTD(tableName, bloomType); 304 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 305 } 306 307 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, 308 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 309 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 310 int factor) throws Exception { 311 return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, 312 useMap, deleteFile, copyFiles, initRowCount, factor, 2); 313 } 314 315 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, 316 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 317 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 318 int factor, int depth) throws Exception { 319 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 320 FileSystem fs = util.getTestFileSystem(); 321 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 322 Path parentDir = baseDirectory; 323 if (depth == 3) { 324 assert !useMap; 325 parentDir = new Path(baseDirectory, "someRegion"); 326 } 327 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 328 329 int hfileIdx = 0; 330 Map<byte[], List<Path>> map = null; 331 List<Path> list = null; 332 if (useMap || copyFiles) { 333 list = new ArrayList<>(); 334 } 335 if (useMap) { 336 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 337 map.put(fam, list); 338 } 339 Path last = null; 340 for (byte[][] range : hfileRanges) { 341 byte[] from = range[0]; 342 byte[] to = range[1]; 343 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 344 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 345 if (useMap) { 346 last = path; 347 list.add(path); 348 } 349 } 350 int expectedRows = hfileIdx * factor; 351 352 TableName tableName = htd.getTableName(); 353 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 354 if (tableSplitKeys != null) { 355 util.getAdmin().createTable(htd, tableSplitKeys); 356 } else { 357 util.getAdmin().createTable(htd); 358 } 359 } 360 361 Configuration conf = util.getConfiguration(); 362 if (copyFiles) { 363 conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true); 364 } 365 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 366 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 367 if (depth == 3) { 368 args.add("-loadTable"); 369 } 370 371 if (useMap) { 372 if (deleteFile) { 373 fs.delete(last, true); 374 } 375 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 376 if (deleteFile) { 377 expectedRows -= 1000; 378 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 379 if (item.getFilePath().getName().equals(last.getName())) { 380 fail(last + " should be missing"); 381 } 382 } 383 } 384 } else { 385 loader.run(args.toArray(new String[] {})); 386 } 387 388 if (copyFiles) { 389 for (Path p : list) { 390 assertTrue(p + " should exist", fs.exists(p)); 391 } 392 } 393 394 try (Table table = util.getConnection().getTable(tableName)) { 395 assertEquals(initRowCount + expectedRows, countRows(table)); 396 } 397 398 return expectedRows; 399 } 400 401 private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, 402 byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth) 403 throws Exception { 404 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 405 useMap, true, copyFiles, 0, 1000, depth); 406 407 final TableName tableName = htd.getTableName(); 408 // verify staging folder has been cleaned up 409 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 410 HConstants.BULKLOAD_STAGING_DIR_NAME); 411 FileSystem fs = util.getTestFileSystem(); 412 if (fs.exists(stagingBasePath)) { 413 FileStatus[] files = fs.listStatus(stagingBasePath); 414 for (FileStatus file : files) { 415 assertTrue("Folder=" + file.getPath() + " is not cleaned up.", 416 file.getPath().getName() != "DONOTERASE"); 417 } 418 } 419 420 util.deleteTable(tableName); 421 } 422 423 /** 424 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 425 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 426 * responses. 427 */ 428 @Test 429 public void testTagsSurviveBulkLoadSplit() throws Exception { 430 Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); 431 FileSystem fs = util.getTestFileSystem(); 432 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 433 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 434 // table has these split points 435 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 436 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 437 438 // creating an hfile that has values that span the split points. 439 byte[] from = Bytes.toBytes("ddd"); 440 byte[] to = Bytes.toBytes("ooo"); 441 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 442 new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 443 int expectedRows = 1000; 444 445 TableName tableName = TableName.valueOf(tn.getMethodName()); 446 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 447 util.getAdmin().createTable(htd, tableSplitKeys); 448 449 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir); 450 451 Table table = util.getConnection().getTable(tableName); 452 try { 453 assertEquals(expectedRows, countRows(table)); 454 HFileTestUtil.verifyTags(table); 455 } finally { 456 table.close(); 457 } 458 459 util.deleteTable(tableName); 460 } 461 462 /** 463 * Test loading into a column family that does not exist. 464 */ 465 @Test 466 public void testNonexistentColumnFamilyLoad() throws Exception { 467 String testName = tn.getMethodName(); 468 byte[][][] hFileRanges = 469 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 470 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 471 472 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 473 // set real family name to upper case in purpose to simulate the case that 474 // family name in HFiles is invalid 475 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 476 .setColumnFamily(ColumnFamilyDescriptorBuilder 477 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 478 .build(); 479 480 try { 481 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 482 assertTrue("Loading into table with non-existent family should have failed", false); 483 } catch (Exception e) { 484 assertTrue("IOException expected", e instanceof IOException); 485 // further check whether the exception message is correct 486 String errMsg = e.getMessage(); 487 assertTrue( 488 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY 489 + "], current message: [" + errMsg + "]", 490 errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); 491 } 492 } 493 494 @Test 495 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 496 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 497 } 498 499 @Test 500 public void testNonHfileFolder() throws Exception { 501 testNonHfileFolder("testNonHfileFolder", false); 502 } 503 504 /** 505 * Write a random data file and a non-file in a dir with a valid family name but not part of the 506 * table families. we should we able to bulkload without getting the unmatched family exception. 507 * HBASE-13037/HBASE-13227 508 */ 509 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 510 Path dir = util.getDataTestDirOnTestFS(tableName); 511 FileSystem fs = util.getTestFileSystem(); 512 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 513 514 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 515 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 516 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 517 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 518 519 final String NON_FAMILY_FOLDER = "_logs"; 520 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 521 fs.mkdirs(nonFamilyDir); 522 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 523 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 524 525 Table table = null; 526 try { 527 if (preCreateTable) { 528 table = util.createTable(TableName.valueOf(tableName), FAMILY); 529 } else { 530 table = util.getConnection().getTable(TableName.valueOf(tableName)); 531 } 532 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir); 533 assertEquals(500, countRows(table)); 534 } finally { 535 if (table != null) { 536 table.close(); 537 } 538 fs.delete(dir, true); 539 } 540 } 541 542 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 543 FSDataOutputStream stream = fs.create(path); 544 try { 545 byte[] data = new byte[1024]; 546 for (int i = 0; i < data.length; ++i) { 547 data[i] = (byte) (i & 0xff); 548 } 549 while (size >= data.length) { 550 stream.write(data, 0, data.length); 551 size -= data.length; 552 } 553 if (size > 0) { 554 stream.write(data, 0, size); 555 } 556 } finally { 557 stream.close(); 558 } 559 } 560 561 @Test 562 public void testSplitStoreFile() throws IOException { 563 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 564 FileSystem fs = util.getTestFileSystem(); 565 Path testIn = new Path(dir, "testhfile"); 566 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 567 String tableName = tn.getMethodName(); 568 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 569 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 570 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 571 572 Path bottomOut = new Path(dir, "bottom.out"); 573 Path topOut = new Path(dir, "top.out"); 574 575 BulkLoadHFilesTool.splitStoreFile( 576 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 577 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 578 579 int rowCount = verifyHFile(bottomOut); 580 rowCount += verifyHFile(topOut); 581 assertEquals(1000, rowCount); 582 } 583 584 /** 585 * Test hfile splits with the favored nodes 586 */ 587 @Test 588 public void testSplitStoreFileWithFavoriteNodes() throws IOException { 589 590 Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); 591 FileSystem fs = util.getDFSCluster().getFileSystem(); 592 593 Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); 594 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 595 String tableName = tn.getMethodName(); 596 Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 597 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 598 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 599 600 Path bottomOut = new Path(dir, "bottom.out"); 601 Path topOut = new Path(dir, "top.out"); 602 603 final AsyncTableRegionLocator regionLocator = 604 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)); 605 BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc, 606 Bytes.toBytes("ggg"), bottomOut, topOut); 607 verifyHFileFavoriteNode(topOut, regionLocator, fs); 608 verifyHFileFavoriteNode(bottomOut, regionLocator, fs); 609 int rowCount = verifyHFile(bottomOut); 610 rowCount += verifyHFile(topOut); 611 assertEquals(1000, rowCount); 612 } 613 614 @Test 615 public void testSplitStoreFileWithCreateTimeTS() throws IOException { 616 Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); 617 FileSystem fs = util.getTestFileSystem(); 618 Path testIn = new Path(dir, "testhfile"); 619 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 620 String tableName = tn.getMethodName(); 621 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 622 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 623 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 624 625 Path bottomOut = new Path(dir, "bottom.out"); 626 Path topOut = new Path(dir, "top.out"); 627 628 BulkLoadHFilesTool.splitStoreFile( 629 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 630 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 631 632 verifyHFileCreateTimeTS(bottomOut); 633 verifyHFileCreateTimeTS(topOut); 634 } 635 636 @Test 637 public void testSplitStoreFileWithNoneToNone() throws IOException { 638 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); 639 } 640 641 @Test 642 public void testSplitStoreFileWithEncodedToEncoded() throws IOException { 643 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); 644 } 645 646 @Test 647 public void testSplitStoreFileWithEncodedToNone() throws IOException { 648 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); 649 } 650 651 @Test 652 public void testSplitStoreFileWithNoneToEncoded() throws IOException { 653 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); 654 } 655 656 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 657 DataBlockEncoding cfEncoding) throws IOException { 658 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 659 FileSystem fs = util.getTestFileSystem(); 660 Path testIn = new Path(dir, "testhfile"); 661 ColumnFamilyDescriptor familyDesc = 662 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 663 String tableName = tn.getMethodName(); 664 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 665 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 666 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 667 668 Path bottomOut = new Path(dir, "bottom.out"); 669 Path topOut = new Path(dir, "top.out"); 670 671 BulkLoadHFilesTool.splitStoreFile( 672 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 673 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 674 675 int rowCount = verifyHFile(bottomOut); 676 rowCount += verifyHFile(topOut); 677 assertEquals(1000, rowCount); 678 } 679 680 private int verifyHFile(Path p) throws IOException { 681 Configuration conf = util.getConfiguration(); 682 HFile.Reader reader = 683 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 684 HFileScanner scanner = reader.getScanner(conf, false, false); 685 scanner.seekTo(); 686 int count = 0; 687 do { 688 count++; 689 } while (scanner.next()); 690 assertTrue(count > 0); 691 reader.close(); 692 return count; 693 } 694 695 private void verifyHFileCreateTimeTS(Path p) throws IOException { 696 Configuration conf = util.getConfiguration(); 697 698 try (HFile.Reader reader = 699 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) { 700 long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime(); 701 MatcherAssert.assertThat(fileCreateTime, greaterThan(0L)); 702 } 703 } 704 705 /** 706 * test split storefile with favorite node information 707 */ 708 private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs) 709 throws IOException { 710 Configuration conf = util.getConfiguration(); 711 712 try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) { 713 714 final byte[] firstRowkey = reader.getFirstRowKey().get(); 715 final HRegionLocation hRegionLocation = 716 FutureUtils.get(regionLocator.getRegionLocation(firstRowkey)); 717 718 final String targetHostName = hRegionLocation.getHostname(); 719 720 if (fs instanceof DistributedFileSystem) { 721 String pathStr = p.toUri().getPath(); 722 LocatedBlocks blocks = 723 ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L); 724 725 boolean isFavoriteNode = false; 726 List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks(); 727 int index = 0; 728 do { 729 if (index > 0) { 730 assertTrue("failed use favored nodes", isFavoriteNode); 731 } 732 isFavoriteNode = false; 733 final LocatedBlock block = locatedBlocks.get(index); 734 735 final DatanodeInfo[] locations = getLocatedBlockLocations(block); 736 for (DatanodeInfo location : locations) { 737 738 final String hostName = location.getHostName(); 739 if ( 740 targetHostName.equals(hostName.equals("127.0.0.1") 741 ? InetAddress.getLocalHost().getHostName() 742 : "127.0.0.1") || targetHostName.equals(hostName) 743 ) { 744 isFavoriteNode = true; 745 break; 746 } 747 } 748 749 index++; 750 } while (index < locatedBlocks.size()); 751 if (index > 0) { 752 assertTrue("failed use favored nodes", isFavoriteNode); 753 } 754 755 } 756 757 } 758 } 759 760 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 761 Integer value = map.containsKey(first) ? map.get(first) : 0; 762 map.put(first, value + 1); 763 764 value = map.containsKey(last) ? map.get(last) : 0; 765 map.put(last, value - 1); 766 } 767 768 @Test 769 public void testInferBoundaries() { 770 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 771 772 /* 773 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 774 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 775 * u---------x The output should be (m,r,u) 776 */ 777 778 String first; 779 String last; 780 781 first = "a"; 782 last = "e"; 783 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 784 785 first = "r"; 786 last = "s"; 787 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 788 789 first = "o"; 790 last = "p"; 791 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 792 793 first = "g"; 794 last = "k"; 795 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 796 797 first = "v"; 798 last = "x"; 799 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 800 801 first = "c"; 802 last = "i"; 803 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 804 805 first = "m"; 806 last = "q"; 807 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 808 809 first = "s"; 810 last = "t"; 811 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 812 813 first = "u"; 814 last = "w"; 815 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 816 817 byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map); 818 byte[][] compare = new byte[3][]; 819 compare[0] = Bytes.toBytes("m"); 820 compare[1] = Bytes.toBytes("r"); 821 compare[2] = Bytes.toBytes("u"); 822 823 assertEquals(3, keysArray.length); 824 825 for (int row = 0; row < keysArray.length; row++) { 826 assertArrayEquals(keysArray[row], compare[row]); 827 } 828 } 829 830 @Test 831 public void testLoadTooMayHFiles() throws Exception { 832 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 833 FileSystem fs = util.getTestFileSystem(); 834 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 835 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 836 837 byte[] from = Bytes.toBytes("begin"); 838 byte[] to = Bytes.toBytes("end"); 839 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 840 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 841 FAMILY, QUALIFIER, from, to, 1000); 842 } 843 844 try { 845 BulkLoadHFiles.create(util.getConfiguration()) 846 .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir); 847 fail("Bulk loading too many files should fail"); 848 } catch (IOException ie) { 849 assertTrue(ie.getMessage() 850 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 851 } 852 } 853 854 @Test(expected = TableNotFoundException.class) 855 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 856 Configuration conf = util.getConfiguration(); 857 conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no"); 858 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 859 String[] args = { "directory", "nonExistingTable" }; 860 loader.run(args); 861 } 862 863 @Test 864 public void testTableWithCFNameStartWithUnderScore() throws Exception { 865 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 866 FileSystem fs = util.getTestFileSystem(); 867 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 868 String family = "_cf"; 869 Path familyDir = new Path(dir, family); 870 871 byte[] from = Bytes.toBytes("begin"); 872 byte[] to = Bytes.toBytes("end"); 873 Configuration conf = util.getConfiguration(); 874 String tableName = tn.getMethodName(); 875 try (Table table = util.createTable(TableName.valueOf(tableName), family)) { 876 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 877 QUALIFIER, from, to, 1000); 878 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir); 879 assertEquals(1000, countRows(table)); 880 } 881 } 882 883 @Test 884 public void testBulkLoadByFamily() throws Exception { 885 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 886 FileSystem fs = util.getTestFileSystem(); 887 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 888 String tableName = tn.getMethodName(); 889 String[] families = { "cf1", "cf2", "cf3" }; 890 for (int i = 0; i < families.length; i++) { 891 byte[] from = Bytes.toBytes(i + "begin"); 892 byte[] to = Bytes.toBytes(i + "end"); 893 Path familyDir = new Path(dir, families[i]); 894 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 895 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 896 } 897 Table table = util.createTable(TableName.valueOf(tableName), families); 898 final AtomicInteger attmptedCalls = new AtomicInteger(); 899 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); 900 BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) { 901 @Override 902 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 903 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 904 final byte[] first, Collection<LoadQueueItem> lqis) { 905 attmptedCalls.incrementAndGet(); 906 return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis); 907 } 908 }; 909 try { 910 loader.bulkLoad(table.getName(), dir); 911 assertEquals(families.length, attmptedCalls.get()); 912 assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table)); 913 } finally { 914 if (null != table) { 915 table.close(); 916 } 917 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); 918 } 919 } 920 921 @Test 922 public void testFailIfNeedSplitHFile() throws IOException { 923 TableName tableName = TableName.valueOf(tn.getMethodName()); 924 Table table = util.createTable(tableName, FAMILY); 925 926 util.loadTable(table, FAMILY); 927 928 FileSystem fs = util.getTestFileSystem(); 929 Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file")); 930 HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER, 931 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 932 933 util.getAdmin().split(tableName); 934 util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1); 935 936 Configuration config = new Configuration(util.getConfiguration()); 937 config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true); 938 BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config); 939 940 String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() }; 941 assertThrows(IOException.class, () -> tool.run(args)); 942 util.getHBaseCluster().getRegions(tableName) 943 .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size())); 944 } 945}