001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.StartMiniClusterOption; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RetriesExhaustedException; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.ipc.CallRunner; 048import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; 049import org.apache.hadoop.hbase.ipc.PriorityFunction; 050import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Threads; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Category({ RegionServerTests.class, MediumTests.class }) 065public class TestRegionServerRejectDuringAbort { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class); 070 071 private static final Logger LOG = 072 LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class); 073 074 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 075 076 private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort"); 077 078 private static byte[] CF = Bytes.toBytes("cf"); 079 080 private static final int REGIONS_NUM = 5; 081 082 private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null); 083 084 private static volatile boolean shouldThrowTooBig = false; 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms 089 UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); 090 UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", 091 CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class); 092 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); 093 UTIL.startMiniCluster(option); 094 TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) 095 .setCoprocessor(SleepWhenCloseCoprocessor.class.getName()) 096 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build(); 097 UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM); 098 } 099 100 public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl { 101 102 public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority, 103 Configuration conf) { 104 super(maxQueueLength, priority, conf); 105 } 106 107 @Override 108 public boolean offer(CallRunner callRunner) { 109 if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) { 110 return false; 111 } 112 return super.offer(callRunner); 113 } 114 } 115 116 @AfterClass 117 public static void tearDown() throws Exception { 118 UTIL.shutdownMiniCluster(); 119 } 120 121 /** 122 * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short 123 * circuits any other logic. This means we no longer even attempt to enqueue the request onto the 124 * call queue. We verify this by using a special call queue which we can trigger to always return 125 * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not 126 * see them. 127 */ 128 @Test 129 public void testRejectRequestsOnAbort() throws Exception { 130 // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to 131 // the server. Disrupting meta requests messes with the test. 132 HRegionServer serverWithoutMeta = null; 133 for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 134 .getRegionServerThreads()) { 135 HRegionServer regionServer = regionServerThread.getRegionServer(); 136 if ( 137 regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty() 138 && !regionServer.getRegions(TABLE_NAME).isEmpty() 139 ) { 140 serverWithoutMeta = regionServer; 141 break; 142 } 143 } 144 145 assertNotNull("couldn't find a server without meta, but with test table regions", 146 serverWithoutMeta); 147 148 Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName())); 149 writer.setDaemon(true); 150 writer.start(); 151 152 // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException 153 // and trigger our custom queue to reject any more requests. This would typically result in 154 // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing 155 // of a request is working. 156 serverWithoutMeta.abort("Abort RS for test"); 157 158 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 159 assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class); 160 } 161 162 private Runnable getWriterThreadRunnable(ServerName loadServer) { 163 return () -> { 164 try { 165 Configuration conf = UTIL.getConfiguration(); 166 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 167 try (Connection conn = ConnectionFactory.createConnection(conf); 168 Table table = conn.getTableBuilder(TABLE_NAME, null) 169 .setRequestAttribute("test", new byte[] { 0 }).build()) { 170 // find the first region to exist on our test server, then submit requests to it 171 for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) { 172 if (regionLocation.getServerName().equals(loadServer)) { 173 submitRequestsToRegion(table, regionLocation.getRegion()); 174 return; 175 } 176 } 177 throw new RuntimeException("Failed to find any regions for loadServer " + loadServer); 178 } 179 } catch (Exception e) { 180 LOG.warn("Failed to load data", e); 181 synchronized (THROWN_EXCEPTION) { 182 THROWN_EXCEPTION.set(e); 183 THROWN_EXCEPTION.notifyAll(); 184 } 185 } 186 }; 187 } 188 189 private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException { 190 // We will block closes of the regions with a CP, so no need to worry about the region getting 191 // reassigned. Just use the same rowkey always. 192 byte[] rowKey = getRowKeyWithin(regionInfo); 193 194 int i = 0; 195 while (true) { 196 try { 197 i++; 198 table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i))); 199 } catch (IOException e) { 200 // only catch RegionServerAbortedException once. After that, the next exception thrown 201 // is our test case 202 if ( 203 !shouldThrowTooBig && e instanceof RetriesExhaustedException 204 && e.getCause() instanceof RegionServerAbortedException 205 ) { 206 shouldThrowTooBig = true; 207 } else { 208 throw e; 209 } 210 } 211 212 // small sleep to relieve pressure 213 Threads.sleep(10); 214 } 215 } 216 217 private byte[] getRowKeyWithin(RegionInfo regionInfo) { 218 byte[] rowKey; 219 // region is start of table, find one after start key 220 if (regionInfo.getStartKey().length == 0) { 221 if (regionInfo.getEndKey().length == 0) { 222 // doesn't matter, single region table 223 return Bytes.toBytes(1); 224 } else { 225 // find a row just before endkey 226 rowKey = Bytes.copy(regionInfo.getEndKey()); 227 rowKey[rowKey.length - 1]--; 228 return rowKey; 229 } 230 } else { 231 return regionInfo.getStartKey(); 232 } 233 } 234 235 public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver { 236 237 public SleepWhenCloseCoprocessor() { 238 } 239 240 @Override 241 public Optional<RegionObserver> getRegionObserver() { 242 return Optional.of(this); 243 } 244 245 @Override 246 public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) 247 throws IOException { 248 // Wait so that the region can't close until we get the information we need from our test 249 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 250 } 251 } 252}