001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.ArgumentMatchers.anyBoolean; 028import static org.mockito.ArgumentMatchers.anyList; 029import static org.mockito.Mockito.doReturn; 030import static org.mockito.Mockito.spy; 031 032import java.io.IOException; 033import java.nio.ByteBuffer; 034import java.util.Deque; 035import java.util.List; 036import java.util.Map; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.stream.IntStream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableExistsException; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.AsyncClusterConnection; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 060import org.apache.hadoop.hbase.regionserver.HRegionServer; 061import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 062import org.apache.hadoop.hbase.testclassification.LargeTests; 063import org.apache.hadoop.hbase.testclassification.MiscTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.Pair; 067import org.junit.AfterClass; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080 081/** 082 * Test cases for the atomic load error handling of the bulk load functionality. 083 */ 084@Category({ MiscTests.class, LargeTests.class }) 085public class TestBulkLoadHFilesSplitRecovery { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestBulkLoadHFilesSplitRecovery.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 092 093 static HBaseTestingUtil util; 094 // used by secure subclass 095 static boolean useSecure = false; 096 097 final static int NUM_CFS = 10; 098 final static byte[] QUAL = Bytes.toBytes("qual"); 099 final static int ROWCOUNT = 100; 100 101 private final static byte[][] families = new byte[NUM_CFS][]; 102 103 @Rule 104 public TestName name = new TestName(); 105 106 static { 107 for (int i = 0; i < NUM_CFS; i++) { 108 families[i] = Bytes.toBytes(family(i)); 109 } 110 } 111 112 static byte[] rowkey(int i) { 113 return Bytes.toBytes(String.format("row_%08d", i)); 114 } 115 116 static String family(int i) { 117 return String.format("family_%04d", i); 118 } 119 120 static byte[] value(int i) { 121 return Bytes.toBytes(String.format("%010d", i)); 122 } 123 124 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 125 byte[] val = value(value); 126 for (int i = 0; i < NUM_CFS; i++) { 127 Path testIn = new Path(dir, family(i)); 128 129 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 130 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 131 } 132 } 133 134 private TableDescriptor createTableDesc(TableName name, int cfs) { 135 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 136 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 137 .forEachOrdered(builder::setColumnFamily); 138 return builder.build(); 139 } 140 141 /** 142 * Creates a table with given table name and specified number of column families if the table does 143 * not already exist. 144 */ 145 private void setupTable(final Connection connection, TableName table, int cfs) 146 throws IOException { 147 try { 148 LOG.info("Creating table " + table); 149 try (Admin admin = connection.getAdmin()) { 150 admin.createTable(createTableDesc(table, cfs)); 151 } 152 } catch (TableExistsException tee) { 153 LOG.info("Table " + table + " already exists"); 154 } 155 } 156 157 /** 158 * Creates a table with given table name,specified number of column families<br> 159 * and splitkeys if the table does not already exist. 160 */ 161 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 162 throws IOException { 163 try { 164 LOG.info("Creating table " + table); 165 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 166 } catch (TableExistsException tee) { 167 LOG.info("Table " + table + " already exists"); 168 } 169 } 170 171 private Path buildBulkFiles(TableName table, int value) throws Exception { 172 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 173 Path bulk1 = new Path(dir, table.getNameAsString() + value); 174 FileSystem fs = util.getTestFileSystem(); 175 buildHFiles(fs, bulk1, value); 176 return bulk1; 177 } 178 179 /** 180 * Populate table with known values. 181 */ 182 private void populateTable(final Connection connection, TableName table, int value) 183 throws Exception { 184 // create HFiles for different column families 185 Path dir = buildBulkFiles(table, value); 186 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir); 187 } 188 189 /** 190 * Split the known table in half. (this is hard coded for this test suite) 191 */ 192 private void forceSplit(TableName table) { 193 try { 194 // need to call regions server to by synchronous but isn't visible. 195 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 196 197 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 198 if (hri.getTable().equals(table)) { 199 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 200 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 201 } 202 } 203 204 // verify that split completed. 205 int regions; 206 do { 207 regions = 0; 208 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 209 if (hri.getTable().equals(table)) { 210 regions++; 211 } 212 } 213 if (regions != 2) { 214 LOG.info("Taking some time to complete split..."); 215 Thread.sleep(250); 216 } 217 } while (regions != 2); 218 } catch (IOException e) { 219 e.printStackTrace(); 220 } catch (InterruptedException e) { 221 e.printStackTrace(); 222 } 223 } 224 225 @BeforeClass 226 public static void setupCluster() throws Exception { 227 util = new HBaseTestingUtil(); 228 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 229 util.startMiniCluster(1); 230 } 231 232 @AfterClass 233 public static void teardownCluster() throws Exception { 234 util.shutdownMiniCluster(); 235 } 236 237 /** 238 * Checks that all columns have the expected value and that there is the expected number of rows. 239 */ 240 void assertExpectedTable(TableName table, int count, int value) throws IOException { 241 TableDescriptor htd = util.getAdmin().getDescriptor(table); 242 assertNotNull(htd); 243 try (Table t = util.getConnection().getTable(table); 244 ResultScanner sr = t.getScanner(new Scan())) { 245 int i = 0; 246 for (Result r; (r = sr.next()) != null;) { 247 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 248 .forEach(v -> assertArrayEquals(value(value), v)); 249 i++; 250 } 251 assertEquals(count, i); 252 } catch (IOException e) { 253 fail("Failed due to exception"); 254 } 255 } 256 257 private static <T> CompletableFuture<T> failedFuture(Throwable error) { 258 CompletableFuture<T> future = new CompletableFuture<>(); 259 future.completeExceptionally(error); 260 return future; 261 } 262 263 private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) { 264 AsyncClusterConnection errConn = spy(conn); 265 doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn).bulkLoad( 266 any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), anyBoolean()); 267 return errConn; 268 } 269 270 /** 271 * Test that shows that exception thrown from the RS side will result in an exception on the 272 * LIHFile client. 273 */ 274 @Test(expected = IOException.class) 275 public void testBulkLoadPhaseFailure() throws Exception { 276 final TableName table = TableName.valueOf(name.getMethodName()); 277 final AtomicInteger attemptedCalls = new AtomicInteger(); 278 Configuration conf = new Configuration(util.getConfiguration()); 279 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 280 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { 281 282 @Override 283 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 284 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 285 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 286 AsyncClusterConnection c = 287 attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn; 288 super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap); 289 } 290 }; 291 Path dir = buildBulkFiles(table, 1); 292 loader.bulkLoad(table, dir); 293 } 294 295 /** 296 * Test that shows that exception thrown from the RS side will result in the expected number of 297 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 298 * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set 299 */ 300 @Test 301 public void testRetryOnIOException() throws Exception { 302 TableName table = TableName.valueOf(name.getMethodName()); 303 AtomicInteger calls = new AtomicInteger(0); 304 setupTable(util.getConnection(), table, 10); 305 Configuration conf = new Configuration(util.getConfiguration()); 306 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 307 conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true); 308 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { 309 310 @Override 311 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 312 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 313 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 314 if ( 315 calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 316 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 317 ) { 318 calls.incrementAndGet(); 319 super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles, 320 item2RegionMap); 321 } else { 322 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 323 } 324 } 325 }; 326 Path dir = buildBulkFiles(table, 1); 327 loader.bulkLoad(table, dir); 328 assertEquals(calls.get(), 2); 329 } 330 331 /** 332 * This test exercises the path where there is a split after initial validation but before the 333 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 334 * split just before the atomic region load. 335 */ 336 @Test 337 public void testSplitWhileBulkLoadPhase() throws Exception { 338 final TableName table = TableName.valueOf(name.getMethodName()); 339 setupTable(util.getConnection(), table, 10); 340 populateTable(util.getConnection(), table, 1); 341 assertExpectedTable(table, ROWCOUNT, 1); 342 343 // Now let's cause trouble. This will occur after checks and cause bulk 344 // files to fail when attempt to atomically import. This is recoverable. 345 final AtomicInteger attemptedCalls = new AtomicInteger(); 346 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 347 348 @Override 349 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 350 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 351 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 352 int i = attemptedCalls.incrementAndGet(); 353 if (i == 1) { 354 // On first attempt force a split. 355 forceSplit(table); 356 } 357 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 358 } 359 }; 360 361 // create HFiles for different column families 362 Path dir = buildBulkFiles(table, 2); 363 loader.bulkLoad(table, dir); 364 365 // check that data was loaded 366 // The three expected attempts are 1) failure because need to split, 2) 367 // load of split top 3) load of split bottom 368 assertEquals(3, attemptedCalls.get()); 369 assertExpectedTable(table, ROWCOUNT, 2); 370 } 371 372 /** 373 * This test splits a table and attempts to bulk load. The bulk import files should be split 374 * before atomically importing. 375 */ 376 @Test 377 public void testGroupOrSplitPresplit() throws Exception { 378 final TableName table = TableName.valueOf(name.getMethodName()); 379 setupTable(util.getConnection(), table, 10); 380 populateTable(util.getConnection(), table, 1); 381 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1); 382 forceSplit(table); 383 384 final AtomicInteger countedLqis = new AtomicInteger(); 385 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 386 387 @Override 388 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 389 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 390 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 391 Pair<List<LoadQueueItem>, String> lqis = 392 super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 393 if (lqis != null && lqis.getFirst() != null) { 394 countedLqis.addAndGet(lqis.getFirst().size()); 395 } 396 return lqis; 397 } 398 }; 399 400 // create HFiles for different column families 401 Path dir = buildBulkFiles(table, 2); 402 loader.bulkLoad(table, dir); 403 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); 404 assertEquals(20, countedLqis.get()); 405 } 406 407 @Test 408 public void testCorrectSplitPoint() throws Exception { 409 final TableName table = TableName.valueOf(name.getMethodName()); 410 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 411 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 412 Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") }; 413 setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS); 414 415 final AtomicInteger bulkloadRpcTimes = new AtomicInteger(); 416 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 417 418 @Override 419 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 420 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 421 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 422 bulkloadRpcTimes.addAndGet(1); 423 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 424 } 425 }; 426 427 Path dir = buildBulkFiles(table, 1); 428 loader.bulkLoad(table, dir); 429 // before HBASE-25281 we need invoke bulkload rpc 8 times 430 assertEquals(4, bulkloadRpcTimes.get()); 431 } 432 433 /** 434 * This test creates a table with many small regions. The bulk load files would be splitted 435 * multiple times before all of them can be loaded successfully. 436 */ 437 @Test 438 public void testSplitTmpFileCleanUp() throws Exception { 439 final TableName table = TableName.valueOf(name.getMethodName()); 440 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 441 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 442 Bytes.toBytes("row_00000050") }; 443 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 444 445 BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration()); 446 447 // create HFiles 448 Path dir = buildBulkFiles(table, 2); 449 loader.bulkLoad(table, dir); 450 // family path 451 Path tmpPath = new Path(dir, family(0)); 452 // TMP_DIR under family path 453 tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR); 454 FileSystem fs = dir.getFileSystem(util.getConfiguration()); 455 // HFiles have been splitted, there is TMP_DIR 456 assertTrue(fs.exists(tmpPath)); 457 // TMP_DIR should have been cleaned-up 458 assertNull(BulkLoadHFilesTool.TMP_DIR + " should be empty.", 459 CommonFSUtils.listStatus(fs, tmpPath)); 460 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); 461 } 462 463 /** 464 * This simulates an remote exception which should cause LIHF to exit with an exception. 465 */ 466 @Test(expected = IOException.class) 467 public void testGroupOrSplitFailure() throws Exception { 468 final TableName tableName = TableName.valueOf(name.getMethodName()); 469 setupTable(util.getConnection(), tableName, 10); 470 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 471 472 private int i = 0; 473 474 @Override 475 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 476 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 477 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 478 i++; 479 480 if (i == 5) { 481 throw new IOException("failure"); 482 } 483 return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 484 } 485 }; 486 487 // create HFiles for different column families 488 Path dir = buildBulkFiles(tableName, 1); 489 loader.bulkLoad(tableName, dir); 490 } 491 492 /** 493 * We are testing a split after initial validation but before the atomic bulk load call. We cannot 494 * use presplitting to test this path, so we actually inject a split just before the atomic region 495 * load. However, we will pass null item2RegionMap and that should not affect the bulk load 496 * behavior. 497 */ 498 @Test 499 public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception { 500 final TableName table = TableName.valueOf(name.getMethodName()); 501 setupTable(util.getConnection(), table, 10); 502 populateTable(util.getConnection(), table, 1); 503 assertExpectedTable(table, ROWCOUNT, 1); 504 505 // Now let's cause trouble. This will occur after checks and cause bulk 506 // files to fail when attempt to atomically import. This is recoverable. 507 final AtomicInteger attemptedCalls = new AtomicInteger(); 508 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 509 510 @Override 511 protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName, 512 final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups, 513 final boolean copyFiles, final Map<LoadQueueItem, ByteBuffer> item2RegionMap) 514 throws IOException { 515 516 int i = attemptedCalls.incrementAndGet(); 517 if (i == 1) { 518 // On first attempt force a split. 519 forceSplit(table); 520 } 521 522 // Passing item2RegionMap null 523 // In the absence of LoadQueueItem, bulk load should work as expected 524 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); 525 } 526 527 }; 528 529 // create HFiles for different column families 530 Path dir = buildBulkFiles(table, 2); 531 loader.bulkLoad(table, dir); 532 533 // check that data was loaded 534 // The three expected attempts are 1) failure because need to split, 2) 535 // load of split top 3) load of split bottom 536 assertEquals(3, attemptedCalls.get()); 537 assertExpectedTable(table, ROWCOUNT, 2); 538 } 539 540 /** 541 * Checks that all columns have the expected value and that there is the expected number of rows. 542 */ 543 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 544 throws IOException { 545 TableDescriptor htd = util.getAdmin().getDescriptor(table); 546 assertNotNull(htd); 547 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 548 int i = 0; 549 for (Result r; (r = sr.next()) != null;) { 550 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 551 .forEach(v -> assertArrayEquals(value(value), v)); 552 i++; 553 } 554 assertEquals(count, i); 555 } catch (IOException e) { 556 fail("Failed due to exception"); 557 } 558 } 559}