001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Objects;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.MetaTableAccessor;
040import org.apache.hadoop.hbase.MiniHBaseCluster;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.StartMiniClusterOption;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionReplicaUtil;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.exceptions.MergeRegionException;
057import org.apache.hadoop.hbase.master.HMaster;
058import org.apache.hadoop.hbase.master.MasterRpcServices;
059import org.apache.hadoop.hbase.master.RegionState;
060import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
061import org.apache.hadoop.hbase.master.assignment.RegionStates;
062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.testclassification.RegionServerTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.util.FutureUtils;
069import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
070import org.apache.hadoop.hbase.util.Pair;
071import org.apache.hadoop.hbase.util.PairOfSameType;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.zookeeper.KeeperException;
075import org.junit.AfterClass;
076import org.junit.BeforeClass;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.rules.TestName;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
086import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
087import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
088
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
092
093@Category({ RegionServerTests.class, LargeTests.class })
094public class TestRegionMergeTransactionOnCluster {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class);
099
100  private static final Logger LOG =
101    LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class);
102
103  @Rule
104  public TestName name = new TestName();
105
106  private static final int NB_SERVERS = 3;
107
108  private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
109  private static final byte[] QUALIFIER = Bytes.toBytes("q");
110
111  private static byte[] ROW = Bytes.toBytes("testRow");
112  private static final int INITIAL_REGION_NUM = 10;
113  private static final int ROWSIZE = 200;
114  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
115
116  private static int waitTime = 60 * 1000;
117
118  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
119
120  private static HMaster MASTER;
121  private static Admin ADMIN;
122
123  @BeforeClass
124  public static void beforeAllTests() throws Exception {
125    // Start a cluster
126    StartMiniClusterOption option = StartMiniClusterOption.builder().masterClass(MyMaster.class)
127      .numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
128    TEST_UTIL.startMiniCluster(option);
129    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
130    MASTER = cluster.getMaster();
131    MASTER.balanceSwitch(false);
132    ADMIN = TEST_UTIL.getConnection().getAdmin();
133  }
134
135  @AfterClass
136  public static void afterAllTests() throws Exception {
137    TEST_UTIL.shutdownMiniCluster();
138    if (ADMIN != null) {
139      ADMIN.close();
140    }
141  }
142
143  @Test
144  public void testWholesomeMerge() throws Exception {
145    LOG.info("Starting " + name.getMethodName());
146    final TableName tableName = TableName.valueOf(name.getMethodName());
147
148    try {
149      // Create table and load data.
150      Table table = createTableAndLoadData(MASTER, tableName);
151      // Merge 1st and 2nd region
152      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
153
154      // Merge 2nd and 3th region
155      PairOfSameType<RegionInfo> mergedRegions =
156        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2);
157
158      verifyRowCount(table, ROWSIZE);
159
160      // Randomly choose one of the two merged regions
161      RegionInfo hri = ThreadLocalRandom.current().nextBoolean()
162        ? mergedRegions.getFirst()
163        : mergedRegions.getSecond();
164      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
165      AssignmentManager am = cluster.getMaster().getAssignmentManager();
166      RegionStates regionStates = am.getRegionStates();
167
168      // We should not be able to assign it again
169      am.assign(hri);
170      assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri));
171
172      // We should not be able to unassign it either
173      am.unassign(hri);
174      assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri));
175
176      table.close();
177    } finally {
178      TEST_UTIL.deleteTable(tableName);
179    }
180  }
181
182  /**
183   * Not really restarting the master. Simulate it by clear of new region state since it is not
184   * persisted, will be lost after master restarts.
185   */
186  @Test
187  public void testMergeAndRestartingMaster() throws Exception {
188    final TableName tableName = TableName.valueOf(name.getMethodName());
189
190    try {
191      // Create table and load data.
192      Table table = createTableAndLoadData(MASTER, tableName);
193
194      try {
195        MyMasterRpcServices.enabled.set(true);
196
197        // Merge 1st and 2nd region
198        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
199      } finally {
200        MyMasterRpcServices.enabled.set(false);
201      }
202
203      table.close();
204    } finally {
205      TEST_UTIL.deleteTable(tableName);
206    }
207  }
208
209  @Test
210  public void testCleanMergeReference() throws Exception {
211    LOG.info("Starting " + name.getMethodName());
212    ADMIN.enableCatalogJanitor(false);
213    final TableName tableName = TableName.valueOf(name.getMethodName());
214    try {
215      // Create table and load data.
216      Table table = createTableAndLoadData(MASTER, tableName);
217      // Merge 1st and 2nd region
218      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
219      verifyRowCount(table, ROWSIZE);
220      table.close();
221
222      List<Pair<RegionInfo, ServerName>> tableRegions =
223        MetaTableAccessor.getTableRegionsAndLocations(MASTER.getConnection(), tableName);
224      RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
225      TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(tableName);
226      Result mergedRegionResult =
227        MetaTableAccessor.getRegionResult(MASTER.getConnection(), mergedRegionInfo);
228
229      // contains merge reference in META
230      assertTrue(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells()));
231
232      // merging regions' directory are in the file system all the same
233      List<RegionInfo> p = MetaTableAccessor.getMergeRegions(mergedRegionResult.rawCells());
234      RegionInfo regionA = p.get(0);
235      RegionInfo regionB = p.get(1);
236      FileSystem fs = MASTER.getMasterFileSystem().getFileSystem();
237      Path rootDir = MASTER.getMasterFileSystem().getRootDir();
238
239      Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable());
240      Path regionAdir = new Path(tabledir, regionA.getEncodedName());
241      Path regionBdir = new Path(tabledir, regionB.getEncodedName());
242      assertTrue(fs.exists(regionAdir));
243      assertTrue(fs.exists(regionBdir));
244
245      ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
246      HRegionFileSystem hrfs =
247        new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
248      int count = 0;
249      for (ColumnFamilyDescriptor colFamily : columnFamilies) {
250        count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
251      }
252      ADMIN.compactRegion(mergedRegionInfo.getRegionName());
253      // clean up the merged region store files
254      // wait until merged region have reference file
255      long timeout = EnvironmentEdgeManager.currentTime() + waitTime;
256      int newcount = 0;
257      while (EnvironmentEdgeManager.currentTime() < timeout) {
258        for (ColumnFamilyDescriptor colFamily : columnFamilies) {
259          newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
260        }
261        if (newcount > count) {
262          break;
263        }
264        Thread.sleep(50);
265      }
266      assertTrue(newcount > count);
267      List<RegionServerThread> regionServerThreads =
268        TEST_UTIL.getHBaseCluster().getRegionServerThreads();
269      for (RegionServerThread rs : regionServerThreads) {
270        CompactedHFilesDischarger cleaner =
271          new CompactedHFilesDischarger(100, null, rs.getRegionServer(), false);
272        cleaner.chore();
273        Thread.sleep(1000);
274      }
275      while (EnvironmentEdgeManager.currentTime() < timeout) {
276        int newcount1 = 0;
277        for (ColumnFamilyDescriptor colFamily : columnFamilies) {
278          newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
279        }
280        if (newcount1 <= 1) {
281          break;
282        }
283        Thread.sleep(50);
284      }
285      // run CatalogJanitor to clean merge references in hbase:meta and archive the
286      // files of merging regions
287      int cleaned = 0;
288      while (cleaned == 0) {
289        cleaned = ADMIN.runCatalogScan();
290        LOG.debug("catalog janitor returned " + cleaned);
291        Thread.sleep(50);
292        // Cleanup is async so wait till all procedures are done running.
293        ProcedureTestingUtility.waitNoProcedureRunning(
294          TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor());
295      }
296      // We used to check for existence of region in fs but sometimes the region dir was
297      // cleaned up by the time we got here making the test sometimes flakey.
298      assertTrue(cleaned > 0);
299
300      // Wait around a bit to give stuff a chance to complete.
301      while (true) {
302        mergedRegionResult =
303          MetaTableAccessor.getRegionResult(TEST_UTIL.getConnection(), mergedRegionInfo);
304        if (MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells())) {
305          LOG.info("Waiting on cleanup of merge columns {}",
306            Arrays.asList(mergedRegionResult.rawCells()).stream().map(c -> c.toString())
307              .collect(Collectors.joining(",")));
308          Threads.sleep(50);
309        } else {
310          break;
311        }
312      }
313      assertFalse(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells()));
314    } finally {
315      ADMIN.enableCatalogJanitor(true);
316      TEST_UTIL.deleteTable(tableName);
317    }
318  }
319
320  /**
321   * This test tests 1, merging region not online; 2, merging same two regions; 3, merging unknown
322   * regions. They are in one test case so that we don't have to create many tables, and these tests
323   * are simple.
324   */
325  @Test
326  public void testMerge() throws Exception {
327    LOG.info("Starting " + name.getMethodName());
328    final TableName tableName = TableName.valueOf(name.getMethodName());
329    final Admin admin = TEST_UTIL.getAdmin();
330    final int syncWaitTimeout = 10 * 60000; // 10min
331
332    try {
333      // Create table and load data.
334      Table table = createTableAndLoadData(MASTER, tableName);
335      AssignmentManager am = MASTER.getAssignmentManager();
336      List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName);
337      // Fake offline one region
338      RegionInfo a = regions.get(0);
339      RegionInfo b = regions.get(1);
340      am.unassign(b);
341      am.offlineRegion(b);
342      try {
343        // Merge offline region. Region a is offline here
344        admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false)
345          .get(syncWaitTimeout, TimeUnit.MILLISECONDS);
346        fail("Offline regions should not be able to merge");
347      } catch (DoNotRetryRegionException ie) {
348        System.out.println(ie);
349        assertTrue(ie instanceof MergeRegionException);
350      }
351
352      try {
353        // Merge the same region: b and b.
354        FutureUtils
355          .get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true));
356        fail("A region should not be able to merge with itself, even forcfully");
357      } catch (IOException ie) {
358        assertTrue("Exception should mention regions not online",
359          StringUtils.stringifyException(ie).contains("region to itself")
360            && ie instanceof MergeRegionException);
361      }
362
363      try {
364        // Merge unknown regions
365        admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true);
366        fail("Unknown region could not be merged");
367      } catch (IOException ie) {
368        assertTrue("UnknownRegionException should be thrown", ie instanceof UnknownRegionException);
369      }
370      table.close();
371    } finally {
372      TEST_UTIL.deleteTable(tableName);
373    }
374  }
375
376  @Test
377  public void testMergeWithReplicas() throws Exception {
378    final TableName tableName = TableName.valueOf(name.getMethodName());
379    try {
380      // Create table and load data.
381      Table table = createTableAndLoadData(MASTER, tableName, 5, 2);
382      List<Pair<RegionInfo, ServerName>> initialRegionToServers =
383        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
384      // Merge 1st and 2nd region
385      PairOfSameType<RegionInfo> mergedRegions =
386        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2);
387      List<Pair<RegionInfo, ServerName>> currentRegionToServers =
388        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
389      List<RegionInfo> initialRegions = new ArrayList<>();
390      for (Pair<RegionInfo, ServerName> p : initialRegionToServers) {
391        initialRegions.add(p.getFirst());
392      }
393      List<RegionInfo> currentRegions = new ArrayList<>();
394      for (Pair<RegionInfo, ServerName> p : currentRegionToServers) {
395        currentRegions.add(p.getFirst());
396      }
397      // this is the first region
398      assertTrue(initialRegions.contains(mergedRegions.getFirst()));
399      // this is the replica of the first region
400      assertTrue(initialRegions
401        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1)));
402      // this is the second region
403      assertTrue(initialRegions.contains(mergedRegions.getSecond()));
404      // this is the replica of the second region
405      assertTrue(initialRegions
406        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1)));
407      // this is the new region
408      assertTrue(!initialRegions.contains(currentRegions.get(0)));
409      // replica of the new region
410      assertTrue(!initialRegions
411        .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1)));
412      // replica of the new region
413      assertTrue(currentRegions
414        .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1)));
415      // replica of the merged region
416      assertTrue(!currentRegions
417        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1)));
418      // replica of the merged region
419      assertTrue(!currentRegions
420        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1)));
421      table.close();
422    } finally {
423      TEST_UTIL.deleteTable(tableName);
424    }
425  }
426
427  private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(HMaster master,
428    TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
429    PairOfSameType<RegionInfo> mergedRegions =
430      requestMergeRegion(master, tablename, regionAnum, regionBnum);
431    waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
432    return mergedRegions;
433  }
434
435  private PairOfSameType<RegionInfo> requestMergeRegion(HMaster master, TableName tablename,
436    int regionAnum, int regionBnum) throws Exception {
437    List<Pair<RegionInfo, ServerName>> tableRegions =
438      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
439    RegionInfo regionA = tableRegions.get(regionAnum).getFirst();
440    RegionInfo regionB = tableRegions.get(regionBnum).getFirst();
441    ADMIN.mergeRegionsAsync(regionA.getEncodedNameAsBytes(), regionB.getEncodedNameAsBytes(),
442      false);
443    return new PairOfSameType<>(regionA, regionB);
444  }
445
446  private void waitAndVerifyRegionNum(HMaster master, TableName tablename, int expectedRegionNum)
447    throws Exception {
448    List<Pair<RegionInfo, ServerName>> tableRegionsInMeta;
449    List<RegionInfo> tableRegionsInMaster;
450    long timeout = EnvironmentEdgeManager.currentTime() + waitTime;
451    while (EnvironmentEdgeManager.currentTime() < timeout) {
452      tableRegionsInMeta =
453        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
454      tableRegionsInMaster =
455        master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename);
456      LOG.info(Objects.toString(tableRegionsInMaster));
457      LOG.info(Objects.toString(tableRegionsInMeta));
458      int tableRegionsInMetaSize = tableRegionsInMeta.size();
459      int tableRegionsInMasterSize = tableRegionsInMaster.size();
460      if (
461        tableRegionsInMetaSize == expectedRegionNum && tableRegionsInMasterSize == expectedRegionNum
462      ) {
463        break;
464      }
465      Thread.sleep(250);
466    }
467
468    tableRegionsInMeta =
469      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
470    LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
471    assertEquals(expectedRegionNum, tableRegionsInMeta.size());
472  }
473
474  private Table createTableAndLoadData(HMaster master, TableName tablename) throws Exception {
475    return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
476  }
477
478  private Table createTableAndLoadData(HMaster master, TableName tablename, int numRegions,
479    int replication) throws Exception {
480    assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
481    byte[][] splitRows = new byte[numRegions - 1][];
482    for (int i = 0; i < splitRows.length; i++) {
483      splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
484    }
485
486    Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
487    LOG.info("Created " + table.getName());
488    if (replication > 1) {
489      HBaseTestingUtility.setReplicas(ADMIN, tablename, replication);
490      LOG.info("Set replication of " + replication + " on " + table.getName());
491    }
492    loadData(table);
493    LOG.info("Loaded " + table.getName());
494    verifyRowCount(table, ROWSIZE);
495    LOG.info("Verified " + table.getName());
496
497    List<Pair<RegionInfo, ServerName>> tableRegions;
498    TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
499    LOG.info("All regions assigned for table - " + table.getName());
500    tableRegions =
501      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
502    assertEquals("Wrong number of regions in table " + tablename, numRegions * replication,
503      tableRegions.size());
504    LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
505    assertEquals(numRegions * replication, tableRegions.size());
506    return table;
507  }
508
509  private static byte[][] makeN(byte[] base, int n) {
510    byte[][] ret = new byte[n][];
511    for (int i = 0; i < n; i++) {
512      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
513    }
514    return ret;
515  }
516
517  private void loadData(Table table) throws IOException {
518    for (int i = 0; i < ROWSIZE; i++) {
519      Put put = new Put(ROWS[i]);
520      put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
521      table.put(put);
522    }
523  }
524
525  private void verifyRowCount(Table table, int expectedRegionNum) throws IOException {
526    ResultScanner scanner = table.getScanner(new Scan());
527    int rowCount = 0;
528    while (scanner.next() != null) {
529      rowCount++;
530    }
531    assertEquals(expectedRegionNum, rowCount);
532    scanner.close();
533  }
534
535  // Make it public so that JVMClusterUtil can access it.
536  public static class MyMaster extends HMaster {
537    public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
538      super(conf);
539    }
540
541    @Override
542    protected RSRpcServices createRpcServices() throws IOException {
543      return new MyMasterRpcServices(this);
544    }
545  }
546
547  static class MyMasterRpcServices extends MasterRpcServices {
548    static AtomicBoolean enabled = new AtomicBoolean(false);
549
550    private HMaster myMaster;
551
552    public MyMasterRpcServices(HMaster master) throws IOException {
553      super(master);
554      myMaster = master;
555    }
556
557    @Override
558    public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
559      ReportRegionStateTransitionRequest req) throws ServiceException {
560      ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
561      if (
562        enabled.get() && req.getTransition(0).getTransitionCode() == TransitionCode.READY_TO_MERGE
563          && !resp.hasErrorMessage()
564      ) {
565        RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
566        for (RegionState regionState : regionStates.getRegionsStateInTransition()) {
567          // Find the merging_new region and remove it
568          if (regionState.isMergingNew()) {
569            regionStates.deleteRegion(regionState.getRegion());
570          }
571        }
572      }
573      return resp;
574    }
575  }
576}