001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.stream.IntStream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.MetaTableAccessor; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableExistsException; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.ClientServiceCallable; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.client.ConnectionFactory; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.RegionLocator; 057import org.apache.hadoop.hbase.client.Result; 058import org.apache.hadoop.hbase.client.ResultScanner; 059import org.apache.hadoop.hbase.client.Scan; 060import org.apache.hadoop.hbase.client.Table; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 063import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 064import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 065import org.apache.hadoop.hbase.log.HBaseMarkers; 066import org.apache.hadoop.hbase.regionserver.HRegionServer; 067import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.testclassification.MiscTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CommonFSUtils; 072import org.apache.hadoop.hbase.util.Pair; 073import org.junit.AfterClass; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.mockito.Mockito; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 085import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 086import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 087 088import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 091 092/** 093 * Test cases for the atomic load error handling of the bulk load functionality. 094 */ 095@Category({ MiscTests.class, LargeTests.class }) 096public class TestLoadIncrementalHFilesSplitRecovery { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class); 101 102 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 103 104 static HBaseTestingUtility util; 105 // used by secure subclass 106 static boolean useSecure = false; 107 108 final static int NUM_CFS = 10; 109 final static byte[] QUAL = Bytes.toBytes("qual"); 110 final static int ROWCOUNT = 100; 111 112 private final static byte[][] families = new byte[NUM_CFS][]; 113 114 @Rule 115 public TestName name = new TestName(); 116 117 static { 118 for (int i = 0; i < NUM_CFS; i++) { 119 families[i] = Bytes.toBytes(family(i)); 120 } 121 } 122 123 static byte[] rowkey(int i) { 124 return Bytes.toBytes(String.format("row_%08d", i)); 125 } 126 127 static String family(int i) { 128 return String.format("family_%04d", i); 129 } 130 131 static byte[] value(int i) { 132 return Bytes.toBytes(String.format("%010d", i)); 133 } 134 135 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 136 byte[] val = value(value); 137 for (int i = 0; i < NUM_CFS; i++) { 138 Path testIn = new Path(dir, family(i)); 139 140 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 141 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 142 } 143 } 144 145 private TableDescriptor createTableDesc(TableName name, int cfs) { 146 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 147 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 148 .forEachOrdered(builder::setColumnFamily); 149 return builder.build(); 150 } 151 152 /** 153 * Creates a table with given table name and specified number of column families if the table does 154 * not already exist. 155 */ 156 private void setupTable(final Connection connection, TableName table, int cfs) 157 throws IOException { 158 try { 159 LOG.info("Creating table " + table); 160 try (Admin admin = connection.getAdmin()) { 161 admin.createTable(createTableDesc(table, cfs)); 162 } 163 } catch (TableExistsException tee) { 164 LOG.info("Table " + table + " already exists"); 165 } 166 } 167 168 /** 169 * Creates a table with given table name,specified number of column families<br> 170 * and splitkeys if the table does not already exist. 171 */ 172 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 173 throws IOException { 174 try { 175 LOG.info("Creating table " + table); 176 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 177 } catch (TableExistsException tee) { 178 LOG.info("Table " + table + " already exists"); 179 } 180 } 181 182 private Path buildBulkFiles(TableName table, int value) throws Exception { 183 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 184 Path bulk1 = new Path(dir, table.getNameAsString() + value); 185 FileSystem fs = util.getTestFileSystem(); 186 buildHFiles(fs, bulk1, value); 187 return bulk1; 188 } 189 190 /** 191 * Populate table with known values. 192 */ 193 private void populateTable(final Connection connection, TableName table, int value) 194 throws Exception { 195 // create HFiles for different column families 196 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 197 Path bulk1 = buildBulkFiles(table, value); 198 try (Table t = connection.getTable(table); 199 RegionLocator locator = connection.getRegionLocator(table); 200 Admin admin = connection.getAdmin()) { 201 lih.doBulkLoad(bulk1, admin, t, locator); 202 } 203 } 204 205 /** 206 * Split the known table in half. (this is hard coded for this test suite) 207 */ 208 private void forceSplit(TableName table) { 209 try { 210 // need to call regions server to by synchronous but isn't visible. 211 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 212 213 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 214 if (hri.getTable().equals(table)) { 215 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 216 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 217 } 218 } 219 220 // verify that split completed. 221 int regions; 222 do { 223 regions = 0; 224 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 225 if (hri.getTable().equals(table)) { 226 regions++; 227 } 228 } 229 if (regions != 2) { 230 LOG.info("Taking some time to complete split..."); 231 Thread.sleep(250); 232 } 233 } while (regions != 2); 234 } catch (IOException e) { 235 e.printStackTrace(); 236 } catch (InterruptedException e) { 237 e.printStackTrace(); 238 } 239 } 240 241 @BeforeClass 242 public static void setupCluster() throws Exception { 243 util = new HBaseTestingUtility(); 244 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 245 util.startMiniCluster(1); 246 } 247 248 @AfterClass 249 public static void teardownCluster() throws Exception { 250 util.shutdownMiniCluster(); 251 } 252 253 /** 254 * Checks that all columns have the expected value and that there is the expected number of rows. 255 */ 256 void assertExpectedTable(TableName table, int count, int value) throws IOException { 257 TableDescriptor htd = util.getAdmin().getDescriptor(table); 258 assertNotNull(htd); 259 try (Table t = util.getConnection().getTable(table); 260 ResultScanner sr = t.getScanner(new Scan())) { 261 int i = 0; 262 for (Result r; (r = sr.next()) != null;) { 263 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 264 .forEach(v -> assertArrayEquals(value(value), v)); 265 i++; 266 } 267 assertEquals(count, i); 268 } catch (IOException e) { 269 fail("Failed due to exception"); 270 } 271 } 272 273 /** 274 * Test that shows that exception thrown from the RS side will result in an exception on the 275 * LIHFile client. 276 */ 277 @Test(expected = IOException.class) 278 public void testBulkLoadPhaseFailure() throws Exception { 279 final TableName table = TableName.valueOf(name.getMethodName()); 280 final AtomicInteger attmptedCalls = new AtomicInteger(); 281 final AtomicInteger failedCalls = new AtomicInteger(); 282 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 283 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 284 setupTable(connection, table, 10); 285 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 286 @Override 287 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, 288 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) 289 throws IOException { 290 int i = attmptedCalls.incrementAndGet(); 291 if (i == 1) { 292 Connection errConn; 293 try { 294 errConn = getMockedConnection(util.getConfiguration()); 295 } catch (Exception e) { 296 LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e); 297 throw new RuntimeException("mocking cruft, should never happen"); 298 } 299 failedCalls.incrementAndGet(); 300 return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true); 301 } 302 303 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true); 304 } 305 }; 306 try { 307 // create HFiles for different column families 308 Path dir = buildBulkFiles(table, 1); 309 try (Table t = connection.getTable(table); 310 RegionLocator locator = connection.getRegionLocator(table); 311 Admin admin = connection.getAdmin()) { 312 lih.doBulkLoad(dir, admin, t, locator); 313 } 314 } finally { 315 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 316 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 317 } 318 fail("doBulkLoad should have thrown an exception"); 319 } 320 } 321 322 /** 323 * Test that shows that exception thrown from the RS side will result in the expected number of 324 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 325 * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set 326 */ 327 @Test 328 public void testRetryOnIOException() throws Exception { 329 final TableName table = TableName.valueOf(name.getMethodName()); 330 final AtomicInteger calls = new AtomicInteger(0); 331 final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); 332 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 333 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); 334 final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 335 @Override 336 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 337 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 338 if ( 339 calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 340 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 341 ) { 342 calls.getAndIncrement(); 343 return new ClientServiceCallable<byte[]>(conn, tableName, first, 344 new RpcControllerFactory(util.getConfiguration()).newController(), 345 HConstants.PRIORITY_UNSET, Collections.emptyMap()) { 346 @Override 347 public byte[] rpcCall() throws Exception { 348 throw new IOException("Error calling something on RegionServer"); 349 } 350 }; 351 } else { 352 return super.buildClientServiceCallable(conn, tableName, first, lqis, true); 353 } 354 } 355 }; 356 setupTable(conn, table, 10); 357 Path dir = buildBulkFiles(table, 1); 358 lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); 359 assertEquals(calls.get(), 2); 360 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); 361 } 362 363 private ClusterConnection getMockedConnection(final Configuration conf) 364 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 365 ClusterConnection c = Mockito.mock(ClusterConnection.class); 366 Mockito.when(c.getConfiguration()).thenReturn(conf); 367 Mockito.doNothing().when(c).close(); 368 // Make it so we return a particular location when asked. 369 final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, 370 ServerName.valueOf("example.org", 1234, 0)); 371 Mockito.when( 372 c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) 373 .thenReturn(loc); 374 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); 375 ClientProtos.ClientService.BlockingInterface hri = 376 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 377 Mockito 378 .when(hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) 379 .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); 380 Mockito.when(c.getClient(Mockito.any())).thenReturn(hri); 381 return c; 382 } 383 384 /** 385 * This test exercises the path where there is a split after initial validation but before the 386 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 387 * split just before the atomic region load. 388 */ 389 @Test 390 public void testSplitWhileBulkLoadPhase() throws Exception { 391 final TableName table = TableName.valueOf(name.getMethodName()); 392 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 393 setupTable(connection, table, 10); 394 populateTable(connection, table, 1); 395 assertExpectedTable(table, ROWCOUNT, 1); 396 397 // Now let's cause trouble. This will occur after checks and cause bulk 398 // files to fail when attempt to atomically import. This is recoverable. 399 final AtomicInteger attemptedCalls = new AtomicInteger(); 400 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { 401 @Override 402 protected void bulkLoadPhase(final Table htable, final Connection conn, 403 ExecutorService pool, Deque<LoadQueueItem> queue, 404 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 405 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 406 int i = attemptedCalls.incrementAndGet(); 407 if (i == 1) { 408 // On first attempt force a split. 409 forceSplit(table); 410 } 411 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 412 } 413 }; 414 415 // create HFiles for different column families 416 try (Table t = connection.getTable(table); 417 RegionLocator locator = connection.getRegionLocator(table); 418 Admin admin = connection.getAdmin()) { 419 Path bulk = buildBulkFiles(table, 2); 420 lih2.doBulkLoad(bulk, admin, t, locator); 421 } 422 423 // check that data was loaded 424 // The three expected attempts are 1) failure because need to split, 2) 425 // load of split top 3) load of split bottom 426 assertEquals(3, attemptedCalls.get()); 427 assertExpectedTable(table, ROWCOUNT, 2); 428 } 429 } 430 431 /** 432 * This test splits a table and attempts to bulk load. The bulk import files should be split 433 * before atomically importing. 434 */ 435 @Test 436 public void testGroupOrSplitPresplit() throws Exception { 437 final TableName table = TableName.valueOf(name.getMethodName()); 438 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 439 setupTable(connection, table, 10); 440 populateTable(connection, table, 1); 441 assertExpectedTable(connection, table, ROWCOUNT, 1); 442 forceSplit(table); 443 444 final AtomicInteger countedLqis = new AtomicInteger(); 445 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 446 @Override 447 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 448 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 449 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 450 Pair<List<LoadQueueItem>, String> lqis = 451 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 452 if (lqis != null && lqis.getFirst() != null) { 453 countedLqis.addAndGet(lqis.getFirst().size()); 454 } 455 return lqis; 456 } 457 }; 458 459 // create HFiles for different column families 460 Path bulk = buildBulkFiles(table, 2); 461 try (Table t = connection.getTable(table); 462 RegionLocator locator = connection.getRegionLocator(table); 463 Admin admin = connection.getAdmin()) { 464 lih.doBulkLoad(bulk, admin, t, locator); 465 } 466 assertExpectedTable(connection, table, ROWCOUNT, 2); 467 assertEquals(20, countedLqis.get()); 468 } 469 } 470 471 @Test 472 public void testCorrectSplitPoint() throws Exception { 473 final TableName table = TableName.valueOf(name.getMethodName()); 474 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 475 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 476 Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") }; 477 setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS); 478 479 final AtomicInteger bulkloadRpcTimes = new AtomicInteger(); 480 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 481 482 @Override 483 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 484 Deque<LoadIncrementalHFiles.LoadQueueItem> queue, 485 Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem> regionGroups, boolean copyFile, 486 Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 487 bulkloadRpcTimes.addAndGet(1); 488 super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 489 } 490 }; 491 492 Path dir = buildBulkFiles(table, 1); 493 loader.bulkLoad(table, dir); 494 // before HBASE-25281 we need invoke bulkload rpc 8 times 495 assertEquals(4, bulkloadRpcTimes.get()); 496 } 497 498 /** 499 * This test creates a table with many small regions. The bulk load files would be splitted 500 * multiple times before all of them can be loaded successfully. 501 */ 502 @Test 503 public void testSplitTmpFileCleanUp() throws Exception { 504 final TableName table = TableName.valueOf(name.getMethodName()); 505 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 506 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 507 Bytes.toBytes("row_00000050") }; 508 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 509 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 510 511 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 512 513 // create HFiles 514 Path bulk = buildBulkFiles(table, 2); 515 try (Table t = connection.getTable(table); 516 RegionLocator locator = connection.getRegionLocator(table); 517 Admin admin = connection.getAdmin()) { 518 lih.doBulkLoad(bulk, admin, t, locator); 519 } 520 // family path 521 Path tmpPath = new Path(bulk, family(0)); 522 // TMP_DIR under family path 523 tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); 524 FileSystem fs = bulk.getFileSystem(util.getConfiguration()); 525 // HFiles have been splitted, there is TMP_DIR 526 assertTrue(fs.exists(tmpPath)); 527 // TMP_DIR should have been cleaned-up 528 assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", 529 CommonFSUtils.listStatus(fs, tmpPath)); 530 assertExpectedTable(connection, table, ROWCOUNT, 2); 531 } 532 } 533 534 /** 535 * This simulates an remote exception which should cause LIHF to exit with an exception. 536 */ 537 @Test(expected = IOException.class) 538 public void testGroupOrSplitFailure() throws Exception { 539 final TableName tableName = TableName.valueOf(name.getMethodName()); 540 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 541 setupTable(connection, tableName, 10); 542 543 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 544 int i = 0; 545 546 @Override 547 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 548 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 549 final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 550 i++; 551 552 if (i == 5) { 553 throw new IOException("failure"); 554 } 555 return super.groupOrSplit(regionGroups, item, table, startEndKeys); 556 } 557 }; 558 559 // create HFiles for different column families 560 Path dir = buildBulkFiles(tableName, 1); 561 try (Table t = connection.getTable(tableName); 562 RegionLocator locator = connection.getRegionLocator(tableName); 563 Admin admin = connection.getAdmin()) { 564 lih.doBulkLoad(dir, admin, t, locator); 565 } 566 } 567 568 fail("doBulkLoad should have thrown an exception"); 569 } 570 571 @Test 572 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { 573 final TableName tableName = TableName.valueOf(name.getMethodName()); 574 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; 575 // Share connection. We were failing to find the table with our new reverse scan because it 576 // looks for first region, not any region -- that is how it works now. The below removes first 577 // region in test. Was reliant on the Connection caching having first region. 578 Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); 579 Table table = connection.getTable(tableName); 580 581 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); 582 Path dir = buildBulkFiles(tableName, 2); 583 584 final AtomicInteger countedLqis = new AtomicInteger(); 585 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 586 587 @Override 588 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 589 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 590 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 591 Pair<List<LoadQueueItem>, String> lqis = 592 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 593 if (lqis != null && lqis.getFirst() != null) { 594 countedLqis.addAndGet(lqis.getFirst().size()); 595 } 596 return lqis; 597 } 598 }; 599 600 // do bulkload when there is no region hole in hbase:meta. 601 try (Table t = connection.getTable(tableName); 602 RegionLocator locator = connection.getRegionLocator(tableName); 603 Admin admin = connection.getAdmin()) { 604 loader.doBulkLoad(dir, admin, t, locator); 605 } catch (Exception e) { 606 LOG.error("exeception=", e); 607 } 608 // check if all the data are loaded into the table. 609 this.assertExpectedTable(tableName, ROWCOUNT, 2); 610 611 dir = buildBulkFiles(tableName, 3); 612 613 // Mess it up by leaving a hole in the hbase:meta 614 List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 615 for (RegionInfo regionInfo : regionInfos) { 616 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 617 MetaTableAccessor.deleteRegionInfo(connection, regionInfo); 618 break; 619 } 620 } 621 622 try (Table t = connection.getTable(tableName); 623 RegionLocator locator = connection.getRegionLocator(tableName); 624 Admin admin = connection.getAdmin()) { 625 loader.doBulkLoad(dir, admin, t, locator); 626 } catch (Exception e) { 627 LOG.error("exception=", e); 628 assertTrue("IOException expected", e instanceof IOException); 629 } 630 631 table.close(); 632 633 // Make sure at least the one region that still exists can be found. 634 regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 635 assertTrue(regionInfos.size() >= 1); 636 637 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); 638 connection.close(); 639 } 640 641 /** 642 * Checks that all columns have the expected value and that there is the expected number of rows. 643 */ 644 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 645 throws IOException { 646 TableDescriptor htd = util.getAdmin().getDescriptor(table); 647 assertNotNull(htd); 648 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 649 int i = 0; 650 for (Result r; (r = sr.next()) != null;) { 651 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 652 .forEach(v -> assertArrayEquals(value(value), v)); 653 i++; 654 } 655 assertEquals(count, i); 656 } catch (IOException e) { 657 fail("Failed due to exception"); 658 } 659 } 660}