001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CyclicBarrier; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 039import org.apache.hadoop.hbase.StartTestingClusterOption; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionReplicaUtil; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.monitoring.MonitoredTask; 048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.HStore; 053import org.apache.hadoop.hbase.regionserver.RSRpcServices; 054import org.apache.hadoop.hbase.regionserver.Region; 055import org.apache.hadoop.hbase.regionserver.RegionServerServices; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.mockito.Mockito; 067 068import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 072 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 076 077@Category({ RegionServerTests.class, LargeTests.class }) 078public class TestRegionReplicationSinkCallbackAndFlushConcurrently { 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFlushConcurrently.class); 082 083 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 084 085 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 086 087 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 088 private static final int NB_SERVERS = 2; 089 090 private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend"); 091 private static volatile boolean startTest = false; 092 093 @BeforeClass 094 public static void setUp() throws Exception { 095 Configuration conf = HTU.getConfiguration(); 096 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 097 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 098 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 15); 099 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 100 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 101 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 102 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 103 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 104 HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) 105 .numRegionServers(NB_SERVERS).build()); 106 107 } 108 109 @AfterClass 110 public static void tearDown() throws Exception { 111 HTU.shutdownMiniCluster(); 112 } 113 114 /** 115 * This test is for HBASE-26768,test the case that we have already clear the 116 * {@link RegionReplicationSink#failedReplicas} due to a flush all edit,which may in flusher 117 * thread,and then in the callback of replay, which may in Netty nioEventLoop,we add a replica to 118 * the {@link RegionReplicationSink#failedReplicas} because of a failure of replicating. 119 */ 120 @Test 121 public void test() throws Exception { 122 final HRegionForTest[] regions = this.createTable(); 123 final AtomicBoolean completedRef = new AtomicBoolean(false); 124 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 125 assertTrue(regionReplicationSink != null); 126 127 RegionReplicationSink spiedRegionReplicationSink = 128 this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], completedRef); 129 130 String oldThreadName = Thread.currentThread().getName(); 131 Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); 132 try { 133 startTest = true; 134 /** 135 * Write First cell,replicating to secondary replica is error. 136 */ 137 byte[] rowKey1 = Bytes.toBytes(1); 138 139 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 140 regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); 141 142 HTU.waitFor(120000, () -> completedRef.get()); 143 assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); 144 } finally { 145 startTest = false; 146 Thread.currentThread().setName(oldThreadName); 147 } 148 } 149 150 private RegionReplicationSink setUpSpiedRegionReplicationSink( 151 final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, 152 final AtomicBoolean completedRef) { 153 final AtomicInteger onCompleteCounter = new AtomicInteger(0); 154 final AtomicInteger getStartFlushAllDescriptorCounter = new AtomicInteger(0); 155 RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); 156 157 Mockito.doAnswer((invocationOnMock) -> { 158 if (!startTest) { 159 invocationOnMock.callRealMethod(); 160 return null; 161 } 162 int count = onCompleteCounter.incrementAndGet(); 163 if (count == 1) { 164 primaryRegion.cyclicBarrier.await(); 165 invocationOnMock.callRealMethod(); 166 completedRef.set(true); 167 return null; 168 } 169 invocationOnMock.callRealMethod(); 170 return null; 171 }).when(spiedRegionReplicationSink).onComplete(Mockito.anyList(), Mockito.anyMap()); 172 173 Mockito.doAnswer((invocationOnMock) -> { 174 if (!startTest) { 175 return invocationOnMock.callRealMethod(); 176 } 177 if ( 178 primaryRegion.prepareFlush 179 && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME) 180 ) { 181 int count = getStartFlushAllDescriptorCounter.incrementAndGet(); 182 if (count == 1) { 183 // onComplete could execute 184 primaryRegion.cyclicBarrier.await(); 185 return invocationOnMock.callRealMethod(); 186 } 187 } 188 return invocationOnMock.callRealMethod(); 189 }).when(spiedRegionReplicationSink).getStartFlushAllDescriptor(Mockito.any()); 190 191 primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); 192 return spiedRegionReplicationSink; 193 } 194 195 private HRegionForTest[] createTable() throws Exception { 196 TableDescriptor tableDescriptor = 197 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS) 198 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 199 HTU.getAdmin().createTable(tableDescriptor); 200 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 201 for (int i = 0; i < NB_SERVERS; i++) { 202 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 203 List<HRegion> onlineRegions = rs.getRegions(tableName); 204 for (HRegion region : onlineRegions) { 205 int replicaId = region.getRegionInfo().getReplicaId(); 206 assertTrue(regions[replicaId] == null); 207 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 208 } 209 } 210 for (Region region : regions) { 211 assertNotNull(region); 212 } 213 return regions; 214 } 215 216 public static final class HRegionForTest extends HRegion { 217 static final String USER_THREAD_NAME = "TestReplicationHang"; 218 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 219 volatile boolean prepareFlush = false; 220 221 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 222 TableDescriptor htd, RegionServerServices rsServices) { 223 super(fs, wal, confParam, htd, rsServices); 224 } 225 226 @SuppressWarnings("deprecation") 227 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 228 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 229 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 230 } 231 232 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 233 this.regionReplicationSink = Optional.of(regionReplicationSink); 234 } 235 236 @Override 237 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 238 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 239 FlushLifeCycleTracker tracker) throws IOException { 240 if (!startTest) { 241 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 242 writeFlushWalMarker, tracker); 243 } 244 245 if ( 246 this.getRegionInfo().getReplicaId() == 0 247 && Thread.currentThread().getName().equals(USER_THREAD_NAME) 248 ) { 249 this.prepareFlush = true; 250 } 251 try { 252 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 253 status, writeFlushWalMarker, tracker); 254 255 return result; 256 } finally { 257 if ( 258 this.getRegionInfo().getReplicaId() == 0 259 && Thread.currentThread().getName().equals(USER_THREAD_NAME) 260 ) { 261 this.prepareFlush = false; 262 } 263 } 264 265 } 266 } 267 268 public static final class ErrorReplayRSRpcServices extends RSRpcServices { 269 private static final AtomicInteger callCounter = new AtomicInteger(0); 270 271 public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { 272 super(rs); 273 } 274 275 @Override 276 public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, 277 ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { 278 279 if (!startTest) { 280 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 281 } 282 283 List<WALEntry> entries = replicateWALEntryRequest.getEntryList(); 284 if (CollectionUtils.isEmpty(entries)) { 285 return ReplicateWALEntryResponse.getDefaultInstance(); 286 } 287 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 288 289 HRegion region; 290 try { 291 region = server.getRegionByEncodedName(regionName.toStringUtf8()); 292 } catch (NotServingRegionException e) { 293 throw new ServiceException(e); 294 } 295 296 if ( 297 !region.getRegionInfo().getTable().equals(tableName) 298 || region.getRegionInfo().getReplicaId() != 1 299 ) { 300 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 301 } 302 303 /** 304 * Simulate the first cell replicating error. 305 */ 306 int count = callCounter.incrementAndGet(); 307 if (count > 1) { 308 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 309 } 310 throw new ServiceException(new DoNotRetryIOException("Inject error!")); 311 } 312 } 313 314 public static final class RSForTest 315 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 316 317 public RSForTest(Configuration conf) throws IOException, InterruptedException { 318 super(conf); 319 } 320 321 @Override 322 protected RSRpcServices createRpcServices() throws IOException { 323 return new ErrorReplayRSRpcServices(this); 324 } 325 } 326}