001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Objects; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CatalogFamilyFormat; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.MetaTableAccessor; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 042import org.apache.hadoop.hbase.StartTestingClusterOption; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.UnknownRegionException; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionReplicaUtil; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.exceptions.MergeRegionException; 057import org.apache.hadoop.hbase.master.HMaster; 058import org.apache.hadoop.hbase.master.MasterRpcServices; 059import org.apache.hadoop.hbase.master.RegionState; 060import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 061import org.apache.hadoop.hbase.master.assignment.RegionStates; 062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 064import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.testclassification.RegionServerTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.CommonFSUtils; 069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 070import org.apache.hadoop.hbase.util.FutureUtils; 071import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 072import org.apache.hadoop.hbase.util.Pair; 073import org.apache.hadoop.hbase.util.PairOfSameType; 074import org.apache.hadoop.hbase.util.Threads; 075import org.apache.hadoop.util.StringUtils; 076import org.apache.zookeeper.KeeperException; 077import org.junit.AfterClass; 078import org.junit.BeforeClass; 079import org.junit.ClassRule; 080import org.junit.Rule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.rules.TestName; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 088import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 089import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 090 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 094 095@Category({ RegionServerTests.class, LargeTests.class }) 096public class TestRegionMergeTransactionOnCluster { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class); 101 102 private static final Logger LOG = 103 LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class); 104 105 @Rule 106 public TestName name = new TestName(); 107 108 private static final int NB_SERVERS = 3; 109 110 private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); 111 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 112 113 private static byte[] ROW = Bytes.toBytes("testRow"); 114 private static final int INITIAL_REGION_NUM = 10; 115 private static final int ROWSIZE = 200; 116 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 117 118 private static int waitTime = 60 * 1000; 119 120 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 121 122 private static HMaster MASTER; 123 private static Admin ADMIN; 124 125 @BeforeClass 126 public static void beforeAllTests() throws Exception { 127 // Start a cluster 128 StartTestingClusterOption option = StartTestingClusterOption.builder() 129 .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build(); 130 TEST_UTIL.startMiniCluster(option); 131 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 132 MASTER = cluster.getMaster(); 133 MASTER.balanceSwitch(false); 134 ADMIN = TEST_UTIL.getConnection().getAdmin(); 135 } 136 137 @AfterClass 138 public static void afterAllTests() throws Exception { 139 TEST_UTIL.shutdownMiniCluster(); 140 if (ADMIN != null) { 141 ADMIN.close(); 142 } 143 } 144 145 @Test 146 public void testWholesomeMerge() throws Exception { 147 LOG.info("Starting " + name.getMethodName()); 148 final TableName tableName = TableName.valueOf(name.getMethodName()); 149 150 try { 151 // Create table and load data. 152 Table table = createTableAndLoadData(MASTER, tableName); 153 // Merge 1st and 2nd region 154 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 155 156 // Merge 2nd and 3th region 157 PairOfSameType<RegionInfo> mergedRegions = 158 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2); 159 160 verifyRowCount(table, ROWSIZE); 161 162 // Randomly choose one of the two merged regions 163 RegionInfo hri = ThreadLocalRandom.current().nextBoolean() 164 ? mergedRegions.getFirst() 165 : mergedRegions.getSecond(); 166 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 167 AssignmentManager am = cluster.getMaster().getAssignmentManager(); 168 RegionStates regionStates = am.getRegionStates(); 169 170 // We should not be able to assign it again 171 am.assign(hri); 172 assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri)); 173 174 // We should not be able to unassign it either 175 am.unassign(hri); 176 assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri)); 177 178 table.close(); 179 } finally { 180 TEST_UTIL.deleteTable(tableName); 181 } 182 } 183 184 /** 185 * Not really restarting the master. Simulate it by clear of new region state since it is not 186 * persisted, will be lost after master restarts. 187 */ 188 @Test 189 public void testMergeAndRestartingMaster() throws Exception { 190 final TableName tableName = TableName.valueOf(name.getMethodName()); 191 192 try { 193 // Create table and load data. 194 Table table = createTableAndLoadData(MASTER, tableName); 195 196 try { 197 MyMasterRpcServices.enabled.set(true); 198 199 // Merge 1st and 2nd region 200 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 201 } finally { 202 MyMasterRpcServices.enabled.set(false); 203 } 204 205 table.close(); 206 } finally { 207 TEST_UTIL.deleteTable(tableName); 208 } 209 } 210 211 @Test 212 public void testCleanMergeReference() throws Exception { 213 LOG.info("Starting " + name.getMethodName()); 214 ADMIN.catalogJanitorSwitch(false); 215 final TableName tableName = TableName.valueOf(name.getMethodName()); 216 try { 217 // Create table and load data. 218 Table table = createTableAndLoadData(MASTER, tableName); 219 // Merge 1st and 2nd region 220 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 221 verifyRowCount(table, ROWSIZE); 222 table.close(); 223 224 List<Pair<RegionInfo, ServerName>> tableRegions = 225 MetaTableAccessor.getTableRegionsAndLocations(MASTER.getConnection(), tableName); 226 RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst(); 227 TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(tableName); 228 Result mergedRegionResult = 229 MetaTableAccessor.getRegionResult(MASTER.getConnection(), mergedRegionInfo); 230 231 // contains merge reference in META 232 assertTrue(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells())); 233 234 // merging regions' directory are in the file system all the same 235 List<RegionInfo> p = CatalogFamilyFormat.getMergeRegions(mergedRegionResult.rawCells()); 236 RegionInfo regionA = p.get(0); 237 RegionInfo regionB = p.get(1); 238 FileSystem fs = MASTER.getMasterFileSystem().getFileSystem(); 239 Path rootDir = MASTER.getMasterFileSystem().getRootDir(); 240 241 Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable()); 242 Path regionAdir = new Path(tabledir, regionA.getEncodedName()); 243 Path regionBdir = new Path(tabledir, regionB.getEncodedName()); 244 assertTrue(fs.exists(regionAdir)); 245 assertTrue(fs.exists(regionBdir)); 246 247 ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); 248 HRegionFileSystem hrfs = 249 new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); 250 int count = 0; 251 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 252 StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(), 253 tableDescriptor, colFamily, hrfs, false); 254 count += sft.load().size(); 255 } 256 ADMIN.compactRegion(mergedRegionInfo.getRegionName()); 257 // clean up the merged region store files 258 // wait until merged region have reference file 259 long timeout = EnvironmentEdgeManager.currentTime() + waitTime; 260 int newcount = 0; 261 while (EnvironmentEdgeManager.currentTime() < timeout) { 262 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 263 StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(), 264 tableDescriptor, colFamily, hrfs, false); 265 newcount += sft.load().size(); 266 } 267 if (newcount > count) { 268 break; 269 } 270 Thread.sleep(50); 271 } 272 assertTrue(newcount > count); 273 List<RegionServerThread> regionServerThreads = 274 TEST_UTIL.getHBaseCluster().getRegionServerThreads(); 275 for (RegionServerThread rs : regionServerThreads) { 276 CompactedHFilesDischarger cleaner = 277 new CompactedHFilesDischarger(100, null, rs.getRegionServer(), false); 278 cleaner.chore(); 279 Thread.sleep(1000); 280 } 281 while (EnvironmentEdgeManager.currentTime() < timeout) { 282 int newcount1 = 0; 283 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 284 StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(), 285 tableDescriptor, colFamily, hrfs, false); 286 newcount1 += sft.load().size(); 287 } 288 if (newcount1 <= 1) { 289 break; 290 } 291 Thread.sleep(50); 292 } 293 // run CatalogJanitor to clean merge references in hbase:meta and archive the 294 // files of merging regions 295 int cleaned = 0; 296 while (cleaned == 0) { 297 cleaned = ADMIN.runCatalogJanitor(); 298 LOG.debug("catalog janitor returned " + cleaned); 299 Thread.sleep(50); 300 // Cleanup is async so wait till all procedures are done running. 301 ProcedureTestingUtility.waitNoProcedureRunning( 302 TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()); 303 } 304 // We used to check for existence of region in fs but sometimes the region dir was 305 // cleaned up by the time we got here making the test sometimes flakey. 306 assertTrue(cleaned > 0); 307 308 // Wait around a bit to give stuff a chance to complete. 309 while (true) { 310 mergedRegionResult = 311 MetaTableAccessor.getRegionResult(TEST_UTIL.getConnection(), mergedRegionInfo); 312 if (CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells())) { 313 LOG.info("Waiting on cleanup of merge columns {}", 314 Arrays.asList(mergedRegionResult.rawCells()).stream().map(c -> c.toString()) 315 .collect(Collectors.joining(","))); 316 Threads.sleep(50); 317 } else { 318 break; 319 } 320 } 321 assertFalse(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells())); 322 } finally { 323 ADMIN.catalogJanitorSwitch(true); 324 TEST_UTIL.deleteTable(tableName); 325 } 326 } 327 328 /** 329 * This test tests 1, merging region not online; 2, merging same two regions; 3, merging unknown 330 * regions. They are in one test case so that we don't have to create many tables, and these tests 331 * are simple. 332 */ 333 @Test 334 public void testMerge() throws Exception { 335 LOG.info("Starting " + name.getMethodName()); 336 final TableName tableName = TableName.valueOf(name.getMethodName()); 337 final Admin admin = TEST_UTIL.getAdmin(); 338 339 try { 340 // Create table and load data. 341 Table table = createTableAndLoadData(MASTER, tableName); 342 AssignmentManager am = MASTER.getAssignmentManager(); 343 List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName); 344 // Fake offline one region 345 RegionInfo a = regions.get(0); 346 RegionInfo b = regions.get(1); 347 am.unassign(b); 348 am.offlineRegion(b); 349 try { 350 // Merge offline region. Region a is offline here 351 FutureUtils.get( 352 admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false)); 353 fail("Offline regions should not be able to merge"); 354 } catch (DoNotRetryRegionException ie) { 355 System.out.println(ie); 356 assertTrue(ie instanceof MergeRegionException); 357 } 358 359 try { 360 // Merge the same region: b and b. 361 FutureUtils 362 .get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true)); 363 fail("A region should not be able to merge with itself, even forcfully"); 364 } catch (IOException ie) { 365 assertTrue("Exception should mention regions not online", 366 StringUtils.stringifyException(ie).contains("region to itself") 367 && ie instanceof MergeRegionException); 368 } 369 370 try { 371 // Merge unknown regions 372 FutureUtils.get(admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true)); 373 fail("Unknown region could not be merged"); 374 } catch (IOException ie) { 375 assertTrue("UnknownRegionException should be thrown", ie instanceof UnknownRegionException); 376 } 377 table.close(); 378 } finally { 379 TEST_UTIL.deleteTable(tableName); 380 } 381 } 382 383 @Test 384 public void testMergeWithReplicas() throws Exception { 385 final TableName tableName = TableName.valueOf(name.getMethodName()); 386 try { 387 // Create table and load data. 388 Table table = createTableAndLoadData(MASTER, tableName, 5, 2); 389 List<Pair<RegionInfo, ServerName>> initialRegionToServers = 390 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName); 391 // Merge 1st and 2nd region 392 PairOfSameType<RegionInfo> mergedRegions = 393 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2); 394 List<Pair<RegionInfo, ServerName>> currentRegionToServers = 395 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName); 396 List<RegionInfo> initialRegions = new ArrayList<>(); 397 for (Pair<RegionInfo, ServerName> p : initialRegionToServers) { 398 initialRegions.add(p.getFirst()); 399 } 400 List<RegionInfo> currentRegions = new ArrayList<>(); 401 for (Pair<RegionInfo, ServerName> p : currentRegionToServers) { 402 currentRegions.add(p.getFirst()); 403 } 404 // this is the first region 405 assertTrue(initialRegions.contains(mergedRegions.getFirst())); 406 // this is the replica of the first region 407 assertTrue(initialRegions 408 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); 409 // this is the second region 410 assertTrue(initialRegions.contains(mergedRegions.getSecond())); 411 // this is the replica of the second region 412 assertTrue(initialRegions 413 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); 414 // this is the new region 415 assertTrue(!initialRegions.contains(currentRegions.get(0))); 416 // replica of the new region 417 assertTrue(!initialRegions 418 .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); 419 // replica of the new region 420 assertTrue(currentRegions 421 .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); 422 // replica of the merged region 423 assertTrue(!currentRegions 424 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); 425 // replica of the merged region 426 assertTrue(!currentRegions 427 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); 428 table.close(); 429 } finally { 430 TEST_UTIL.deleteTable(tableName); 431 } 432 } 433 434 private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(HMaster master, 435 TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception { 436 PairOfSameType<RegionInfo> mergedRegions = 437 requestMergeRegion(master, tablename, regionAnum, regionBnum); 438 waitAndVerifyRegionNum(master, tablename, expectedRegionNum); 439 return mergedRegions; 440 } 441 442 private PairOfSameType<RegionInfo> requestMergeRegion(HMaster master, TableName tablename, 443 int regionAnum, int regionBnum) throws Exception { 444 List<Pair<RegionInfo, ServerName>> tableRegions = 445 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 446 RegionInfo regionA = tableRegions.get(regionAnum).getFirst(); 447 RegionInfo regionB = tableRegions.get(regionBnum).getFirst(); 448 ADMIN.mergeRegionsAsync(regionA.getEncodedNameAsBytes(), regionB.getEncodedNameAsBytes(), 449 false); 450 return new PairOfSameType<>(regionA, regionB); 451 } 452 453 private void waitAndVerifyRegionNum(HMaster master, TableName tablename, int expectedRegionNum) 454 throws Exception { 455 List<Pair<RegionInfo, ServerName>> tableRegionsInMeta; 456 List<RegionInfo> tableRegionsInMaster; 457 long timeout = EnvironmentEdgeManager.currentTime() + waitTime; 458 while (EnvironmentEdgeManager.currentTime() < timeout) { 459 tableRegionsInMeta = 460 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 461 tableRegionsInMaster = 462 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename); 463 LOG.info(Objects.toString(tableRegionsInMaster)); 464 LOG.info(Objects.toString(tableRegionsInMeta)); 465 int tableRegionsInMetaSize = tableRegionsInMeta.size(); 466 int tableRegionsInMasterSize = tableRegionsInMaster.size(); 467 if ( 468 tableRegionsInMetaSize == expectedRegionNum && tableRegionsInMasterSize == expectedRegionNum 469 ) { 470 break; 471 } 472 Thread.sleep(250); 473 } 474 475 tableRegionsInMeta = 476 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 477 LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta)); 478 assertEquals(expectedRegionNum, tableRegionsInMeta.size()); 479 } 480 481 private Table createTableAndLoadData(HMaster master, TableName tablename) throws Exception { 482 return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1); 483 } 484 485 private Table createTableAndLoadData(HMaster master, TableName tablename, int numRegions, 486 int replication) throws Exception { 487 assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions); 488 byte[][] splitRows = new byte[numRegions - 1][]; 489 for (int i = 0; i < splitRows.length; i++) { 490 splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions]; 491 } 492 493 Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows); 494 LOG.info("Created " + table.getName()); 495 if (replication > 1) { 496 HBaseTestingUtil.setReplicas(ADMIN, tablename, replication); 497 LOG.info("Set replication of " + replication + " on " + table.getName()); 498 } 499 loadData(table); 500 LOG.info("Loaded " + table.getName()); 501 verifyRowCount(table, ROWSIZE); 502 LOG.info("Verified " + table.getName()); 503 504 List<Pair<RegionInfo, ServerName>> tableRegions; 505 TEST_UTIL.waitUntilAllRegionsAssigned(tablename); 506 LOG.info("All regions assigned for table - " + table.getName()); 507 tableRegions = 508 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 509 assertEquals("Wrong number of regions in table " + tablename, numRegions * replication, 510 tableRegions.size()); 511 LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions)); 512 assertEquals(numRegions * replication, tableRegions.size()); 513 return table; 514 } 515 516 private static byte[][] makeN(byte[] base, int n) { 517 byte[][] ret = new byte[n][]; 518 for (int i = 0; i < n; i++) { 519 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); 520 } 521 return ret; 522 } 523 524 private void loadData(Table table) throws IOException { 525 for (int i = 0; i < ROWSIZE; i++) { 526 Put put = new Put(ROWS[i]); 527 put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i)); 528 table.put(put); 529 } 530 } 531 532 private void verifyRowCount(Table table, int expectedRegionNum) throws IOException { 533 ResultScanner scanner = table.getScanner(new Scan()); 534 int rowCount = 0; 535 while (scanner.next() != null) { 536 rowCount++; 537 } 538 assertEquals(expectedRegionNum, rowCount); 539 scanner.close(); 540 } 541 542 // Make it public so that JVMClusterUtil can access it. 543 public static class MyMaster extends HMaster { 544 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 545 super(conf); 546 } 547 548 @Override 549 protected MasterRpcServices createRpcServices() throws IOException { 550 return new MyMasterRpcServices(this); 551 } 552 } 553 554 static class MyMasterRpcServices extends MasterRpcServices { 555 static AtomicBoolean enabled = new AtomicBoolean(false); 556 557 private HMaster myMaster; 558 559 public MyMasterRpcServices(HMaster master) throws IOException { 560 super(master); 561 myMaster = master; 562 } 563 564 @Override 565 public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c, 566 ReportRegionStateTransitionRequest req) throws ServiceException { 567 ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req); 568 if ( 569 enabled.get() && req.getTransition(0).getTransitionCode() == TransitionCode.READY_TO_MERGE 570 && !resp.hasErrorMessage() 571 ) { 572 RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates(); 573 for (RegionState regionState : regionStates.getRegionsStateInTransition()) { 574 // Find the merging_new region and remove it 575 if (regionState.isMergingNew()) { 576 regionStates.deleteRegion(regionState.getRegion()); 577 } 578 } 579 } 580 return resp; 581 } 582 } 583}