001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.StartTestingClusterOption; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RetriesExhaustedException; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.ipc.CallRunner; 048import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; 049import org.apache.hadoop.hbase.ipc.PriorityFunction; 050import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Category({ RegionServerTests.class, MediumTests.class }) 065public class TestRegionServerRejectDuringAbort { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class); 070 071 private static final Logger LOG = 072 LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class); 073 074 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort"); 077 078 private static byte[] CF = Bytes.toBytes("cf"); 079 080 private static final int REGIONS_NUM = 5; 081 082 private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null); 083 084 private static volatile boolean shouldThrowTooBig = false; 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms 089 UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); 090 UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", 091 CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class); 092 StartTestingClusterOption option = 093 StartTestingClusterOption.builder().numRegionServers(2).build(); 094 UTIL.startMiniCluster(option); 095 TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) 096 .setCoprocessor(SleepWhenCloseCoprocessor.class.getName()) 097 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build(); 098 UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM); 099 } 100 101 public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl { 102 103 public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority, 104 Configuration conf) { 105 super(maxQueueLength, priority, conf); 106 } 107 108 @Override 109 public boolean offer(CallRunner callRunner) { 110 if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) { 111 return false; 112 } 113 return super.offer(callRunner); 114 } 115 } 116 117 @AfterClass 118 public static void tearDown() throws Exception { 119 UTIL.shutdownMiniCluster(); 120 } 121 122 /** 123 * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short 124 * circuits any other logic. This means we no longer even attempt to enqueue the request onto the 125 * call queue. We verify this by using a special call queue which we can trigger to always return 126 * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not 127 * see them. 128 */ 129 @Test 130 public void testRejectRequestsOnAbort() throws Exception { 131 // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to 132 // the server. Disrupting meta requests messes with the test. 133 HRegionServer serverWithoutMeta = null; 134 for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 135 .getRegionServerThreads()) { 136 HRegionServer regionServer = regionServerThread.getRegionServer(); 137 if ( 138 regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty() 139 && !regionServer.getRegions(TABLE_NAME).isEmpty() 140 ) { 141 serverWithoutMeta = regionServer; 142 break; 143 } 144 } 145 146 assertNotNull("couldn't find a server without meta, but with test table regions", 147 serverWithoutMeta); 148 149 Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName())); 150 writer.setDaemon(true); 151 writer.start(); 152 153 // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException 154 // and trigger our custom queue to reject any more requests. This would typically result in 155 // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing 156 // of a request is working. 157 serverWithoutMeta.abort("Abort RS for test"); 158 159 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 160 assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class); 161 } 162 163 private Runnable getWriterThreadRunnable(ServerName loadServer) { 164 return () -> { 165 try { 166 Configuration conf = UTIL.getConfiguration(); 167 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 168 try (Connection conn = ConnectionFactory.createConnection(conf); 169 Table table = conn.getTableBuilder(TABLE_NAME, null) 170 .setRequestAttribute("test", new byte[] { 0 }).build()) { 171 // find the first region to exist on our test server, then submit requests to it 172 for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) { 173 if (regionLocation.getServerName().equals(loadServer)) { 174 submitRequestsToRegion(table, regionLocation.getRegion()); 175 return; 176 } 177 } 178 throw new RuntimeException("Failed to find any regions for loadServer " + loadServer); 179 } 180 } catch (Exception e) { 181 LOG.warn("Failed to load data", e); 182 synchronized (THROWN_EXCEPTION) { 183 THROWN_EXCEPTION.set(e); 184 THROWN_EXCEPTION.notifyAll(); 185 } 186 } 187 }; 188 } 189 190 private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException { 191 // We will block closes of the regions with a CP, so no need to worry about the region getting 192 // reassigned. Just use the same rowkey always. 193 byte[] rowKey = getRowKeyWithin(regionInfo); 194 195 int i = 0; 196 while (true) { 197 try { 198 i++; 199 table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i))); 200 } catch (IOException e) { 201 // only catch RegionServerAbortedException once. After that, the next exception thrown 202 // is our test case 203 if ( 204 !shouldThrowTooBig && e instanceof RetriesExhaustedException 205 && e.getCause() instanceof RegionServerAbortedException 206 ) { 207 shouldThrowTooBig = true; 208 } else { 209 throw e; 210 } 211 } 212 213 // small sleep to relieve pressure 214 Threads.sleep(10); 215 } 216 } 217 218 private byte[] getRowKeyWithin(RegionInfo regionInfo) { 219 byte[] rowKey; 220 // region is start of table, find one after start key 221 if (regionInfo.getStartKey().length == 0) { 222 if (regionInfo.getEndKey().length == 0) { 223 // doesn't matter, single region table 224 return Bytes.toBytes(1); 225 } else { 226 // find a row just before endkey 227 rowKey = Bytes.copy(regionInfo.getEndKey()); 228 rowKey[rowKey.length - 1]--; 229 return rowKey; 230 } 231 } else { 232 return regionInfo.getStartKey(); 233 } 234 } 235 236 public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver { 237 238 public SleepWhenCloseCoprocessor() { 239 } 240 241 @Override 242 public Optional<RegionObserver> getRegionObserver() { 243 return Optional.of(this); 244 } 245 246 @Override 247 public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> c, 248 boolean abortRequested) throws IOException { 249 // Wait so that the region can't close until we get the information we need from our test 250 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 251 } 252 } 253}