001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.StartTestingClusterOption; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Consistency; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Mutation; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionReplicaUtil; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 056import org.apache.hadoop.hbase.regionserver.HRegion; 057import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 058import org.apache.hadoop.hbase.regionserver.HRegionServer; 059import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 060import org.apache.hadoop.hbase.regionserver.RegionServerServices; 061import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 062import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 063import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 065import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 071import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.apache.hadoop.hbase.wal.WALFactory; 074import org.apache.hadoop.hbase.wal.WALKeyImpl; 075import org.apache.hadoop.hbase.wal.WALProvider; 076import org.junit.AfterClass; 077import org.junit.BeforeClass; 078import org.junit.ClassRule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.mockito.Mockito; 082 083import org.apache.hbase.thirdparty.io.netty.channel.Channel; 084import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 085 086@Category({ RegionServerTests.class, LargeTests.class }) 087public class TestRegionReplicationForWriteException { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestRegionReplicationForWriteException.class); 092 093 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 094 095 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 096 097 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 098 private static final int NB_SERVERS = 2; 099 100 private static TableName tableName = TableName.valueOf("TestRegionReplicationForWriteException"); 101 private static volatile boolean testWALTimout = false; 102 private static volatile boolean testCP = false; 103 private static final long timeoutMIlliseconds = 3000; 104 private static final String USER_THREAD_NAME = tableName.getNameAsString(); 105 106 @BeforeClass 107 public static void setUp() throws Exception { 108 Configuration conf = HTU.getConfiguration(); 109 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 110 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 111 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1); 112 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 113 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 114 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 115 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 116 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 117 conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3); 118 conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class); 119 conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, timeoutMIlliseconds); 120 HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build()); 121 122 } 123 124 @AfterClass 125 public static void tearDown() throws Exception { 126 HTU.shutdownMiniCluster(); 127 } 128 129 /** 130 * This test is for HBASE-27303. 131 */ 132 @Test 133 public void testWriteException() throws Exception { 134 final HRegionForTest[] regions = this.createTable(); 135 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 136 assertTrue(regionReplicationSink != null); 137 final AtomicInteger replicateCounter = new AtomicInteger(0); 138 this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], replicateCounter); 139 140 String oldThreadName = Thread.currentThread().getName(); 141 Thread.currentThread().setName(USER_THREAD_NAME); 142 try { 143 testCP = true; 144 try { 145 byte[] rowKey1 = Bytes.toBytes(1); 146 byte[] value1 = Bytes.toBytes(3); 147 /** 148 * Write first put,{@link WAL#sync} is successful but {@link RegionObserver#postBatchMutate} 149 * throws exception,the rowkey1 is applied to primary and secondary replicas successfully. 150 */ 151 try { 152 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, value1)); 153 fail(); 154 } catch (DoNotRetryIOException e) { 155 assertTrue(e.getMessage().equals(MyRegionObserver.ERROR_MESSAGE)); 156 } 157 158 try (Table table = HTU.getConnection().getTable(tableName)) { 159 assertTrue(checkReplica(table, FAMILY, QUAL, rowKey1, value1, 0)); 160 HTU.waitFor(30000, () -> checkReplica(table, FAMILY, QUAL, rowKey1, value1, 1)); 161 } 162 } finally { 163 testCP = false; 164 } 165 166 byte[] rowKey2 = Bytes.toBytes(2); 167 byte[] value2 = Bytes.toBytes(6); 168 replicateCounter.set(0); 169 testWALTimout = true; 170 try { 171 /** 172 * Write second put,the {@link WAL#sync} timeout and throws 173 * {@link WALSyncTimeoutIOException},{@link HRegion#put} is failed and rowKey2 is not 174 * applied to primary and secondary replicas. 175 */ 176 try { 177 regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, value2)); 178 fail(); 179 } catch (WALSyncTimeoutIOException e) { 180 assertTrue(e != null); 181 } 182 183 assertTrue(regions[0].getRSServices().isAborted()); 184 assertTrue(replicateCounter.get() == 0); 185 Thread.sleep(2000); 186 try (Table table = HTU.getConnection().getTable(tableName)) { 187 assertFalse(checkReplica(table, FAMILY, QUAL, rowKey2, value2, 1)); 188 } 189 } finally { 190 testWALTimout = false; 191 } 192 } finally { 193 Thread.currentThread().setName(oldThreadName); 194 } 195 } 196 197 private RegionReplicationSink setUpSpiedRegionReplicationSink( 198 final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, 199 final AtomicInteger counter) { 200 RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); 201 202 Mockito.doAnswer((invocationOnMock) -> { 203 if (!testWALTimout || !USER_THREAD_NAME.equals(Thread.currentThread().getName())) { 204 invocationOnMock.callRealMethod(); 205 return null; 206 } 207 WALKeyImpl walKey = invocationOnMock.getArgument(0); 208 if (!walKey.getTableName().equals(tableName)) { 209 invocationOnMock.callRealMethod(); 210 return null; 211 } 212 counter.incrementAndGet(); 213 invocationOnMock.callRealMethod(); 214 return null; 215 }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); 216 primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); 217 return spiedRegionReplicationSink; 218 } 219 220 private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey, 221 byte[] expectValue, int replicaId) throws IOException { 222 Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(replicaId); 223 Result result = table.get(get); 224 byte[] value = result.getValue(fam, qual); 225 return value != null && value.length > 0 && Arrays.equals(expectValue, value); 226 } 227 228 private HRegionForTest[] createTable() throws Exception { 229 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 230 .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 231 .setCoprocessor(MyRegionObserver.class.getName()).build(); 232 HTU.getAdmin().createTable(tableDescriptor); 233 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 234 for (int i = 0; i < NB_SERVERS; i++) { 235 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 236 List<HRegion> onlineRegions = rs.getRegions(tableName); 237 for (HRegion region : onlineRegions) { 238 int replicaId = region.getRegionInfo().getReplicaId(); 239 assertTrue(regions[replicaId] == null); 240 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 241 } 242 } 243 for (HRegionForTest region : regions) { 244 assertNotNull(region); 245 } 246 return regions; 247 } 248 249 public static final class HRegionForTest extends HRegion { 250 251 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 252 TableDescriptor htd, RegionServerServices rsServices) { 253 super(fs, wal, confParam, htd, rsServices); 254 } 255 256 @SuppressWarnings("deprecation") 257 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 258 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 259 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 260 } 261 262 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 263 this.regionReplicationSink = Optional.of(regionReplicationSink); 264 } 265 266 public RegionServerServices getRSServices() { 267 return this.rsServices; 268 } 269 } 270 271 public static class SlowAsyncFSWAL extends AsyncFSWAL { 272 273 public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 274 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 275 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 276 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 277 throws FailedLogCloseException, IOException { 278 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 279 suffix, null, null, eventLoopGroup, channelClass, monitor); 280 } 281 282 @Override 283 protected void atHeadOfRingBufferEventHandlerAppend() { 284 if (testWALTimout) { 285 try { 286 Thread.sleep(timeoutMIlliseconds + 1000); 287 } catch (InterruptedException e) { 288 throw new RuntimeException(e); 289 } 290 } 291 super.atHeadOfRingBufferEventHandlerAppend(); 292 } 293 294 } 295 296 public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider { 297 298 @Override 299 protected AsyncFSWAL createWAL() throws IOException { 300 return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 301 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()), 302 getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix, 303 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 304 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 305 } 306 307 } 308 309 public static class MyRegionObserver implements RegionCoprocessor, RegionObserver { 310 311 private static final String ERROR_MESSAGE = "Inject error!"; 312 313 @Override 314 public Optional<RegionObserver> getRegionObserver() { 315 return Optional.of(this); 316 } 317 318 @Override 319 public void postBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 320 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 321 if (!testCP || !RegionReplicaUtil.isDefaultReplica(c.getEnvironment().getRegionInfo())) { 322 return; 323 } 324 throw new DoNotRetryIOException(ERROR_MESSAGE); 325 } 326 } 327 328}