001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.BrokenBarrierException; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 039import org.apache.hadoop.hbase.StartTestingClusterOption; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionReplicaUtil; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.monitoring.MonitoredTask; 048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.HStore; 053import org.apache.hadoop.hbase.regionserver.MemStoreFlusher; 054import org.apache.hadoop.hbase.regionserver.RSRpcServices; 055import org.apache.hadoop.hbase.regionserver.Region; 056import org.apache.hadoop.hbase.regionserver.RegionServerServices; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.testclassification.RegionServerTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.junit.AfterClass; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067 068import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 072 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 077 078@Category({ RegionServerTests.class, LargeTests.class }) 079public class TestRegionReplicationForFlushMarker { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestRegionReplicationForFlushMarker.class); 084 085 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 086 087 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 088 089 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 090 private static final int NB_SERVERS = 2; 091 092 private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker"); 093 private static volatile boolean startTest = false; 094 095 @BeforeClass 096 public static void setUp() throws Exception { 097 Configuration conf = HTU.getConfiguration(); 098 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 099 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 100 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1); 101 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 102 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 103 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 104 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 105 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 106 conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3); 107 HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) 108 .numRegionServers(NB_SERVERS).build()); 109 110 } 111 112 @AfterClass 113 public static void tearDown() throws Exception { 114 HTU.shutdownMiniCluster(); 115 } 116 117 /** 118 * This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the 119 * {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the 120 * {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive 121 * the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when 122 * the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not 123 * writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the 124 * {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica. 125 */ 126 @Test 127 public void testCannotFlushMarker() throws Exception { 128 final HRegionForTest[] regions = this.createTable(); 129 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 130 assertTrue(regionReplicationSink != null); 131 132 String oldThreadName = Thread.currentThread().getName(); 133 Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); 134 try { 135 136 byte[] rowKey1 = Bytes.toBytes(1); 137 startTest = true; 138 /** 139 * Write First cell,replicating to secondary replica is error,and then 140 * {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving 141 * the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is 142 * cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so 143 * {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the 144 * {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL. 145 */ 146 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 147 /** 148 * Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication 149 */ 150 regions[0].cyclicBarrier.await(); 151 assertTrue(regions[0].prepareFlushCounter.get() == 2); 152 /** 153 * The {@link RegionReplicationSink#failedReplicas} is cleared by the 154 * {@link FlushAction#CANNOT_FLUSH} marker. 155 */ 156 assertTrue(regionReplicationSink.getFailedReplicas().isEmpty()); 157 } finally { 158 startTest = false; 159 Thread.currentThread().setName(oldThreadName); 160 } 161 } 162 163 private HRegionForTest[] createTable() throws Exception { 164 TableDescriptor tableDescriptor = 165 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS) 166 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 167 HTU.getAdmin().createTable(tableDescriptor); 168 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 169 for (int i = 0; i < NB_SERVERS; i++) { 170 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 171 List<HRegion> onlineRegions = rs.getRegions(tableName); 172 for (HRegion region : onlineRegions) { 173 int replicaId = region.getRegionInfo().getReplicaId(); 174 assertTrue(regions[replicaId] == null); 175 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 176 } 177 } 178 for (Region region : regions) { 179 assertNotNull(region); 180 } 181 return regions; 182 } 183 184 public static final class HRegionForTest extends HRegion { 185 static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker"; 186 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 187 final AtomicInteger prepareFlushCounter = new AtomicInteger(0); 188 189 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 190 TableDescriptor htd, RegionServerServices rsServices) { 191 super(fs, wal, confParam, htd, rsServices); 192 } 193 194 @SuppressWarnings("deprecation") 195 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 196 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 197 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 198 } 199 200 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 201 this.regionReplicationSink = Optional.of(regionReplicationSink); 202 } 203 204 @Override 205 protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { 206 // not write the region open marker to interrupt the test. 207 } 208 209 @Override 210 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 211 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 212 FlushLifeCycleTracker tracker) throws IOException { 213 if (!startTest) { 214 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 215 writeFlushWalMarker, tracker); 216 } 217 218 if (this.getRegionInfo().getReplicaId() != 0) { 219 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 220 writeFlushWalMarker, tracker); 221 } 222 223 try { 224 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 225 status, writeFlushWalMarker, tracker); 226 this.prepareFlushCounter.incrementAndGet(); 227 /** 228 * First flush is {@link FlushAction#START_FLUSH} marker and the second flush is 229 * {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty. 230 */ 231 if ( 232 this.prepareFlushCounter.get() == 2 && result.getResult() != null 233 && result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY 234 ) { 235 236 cyclicBarrier.await(); 237 } 238 return result; 239 } catch (BrokenBarrierException | InterruptedException e) { 240 throw new RuntimeException(e); 241 } 242 243 } 244 } 245 246 public static final class ErrorReplayRSRpcServices extends RSRpcServices { 247 private static final AtomicInteger callCounter = new AtomicInteger(0); 248 249 public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { 250 super(rs); 251 } 252 253 @Override 254 public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, 255 ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { 256 257 if (!startTest) { 258 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 259 } 260 261 List<WALEntry> entries = replicateWALEntryRequest.getEntryList(); 262 if (CollectionUtils.isEmpty(entries)) { 263 return ReplicateWALEntryResponse.getDefaultInstance(); 264 } 265 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 266 267 HRegion region; 268 try { 269 region = server.getRegionByEncodedName(regionName.toStringUtf8()); 270 } catch (NotServingRegionException e) { 271 throw new ServiceException(e); 272 } 273 274 if ( 275 !region.getRegionInfo().getTable().equals(tableName) 276 || region.getRegionInfo().getReplicaId() != 1 277 ) { 278 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 279 } 280 281 /** 282 * Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error. 283 */ 284 int count = callCounter.incrementAndGet(); 285 if (count > 2) { 286 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 287 } 288 throw new ServiceException(new DoNotRetryIOException("Inject error!")); 289 } 290 } 291 292 public static final class RSForTest 293 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 294 295 public RSForTest(Configuration conf) throws IOException, InterruptedException { 296 super(conf); 297 } 298 299 @Override 300 protected RSRpcServices createRpcServices() throws IOException { 301 return new ErrorReplayRSRpcServices(this); 302 } 303 } 304 305}