001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.StartMiniClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RetriesExhaustedException;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.ipc.CallRunner;
048import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
049import org.apache.hadoop.hbase.ipc.PriorityFunction;
050import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Category({ RegionServerTests.class, MediumTests.class })
065public class TestRegionServerRejectDuringAbort {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class);
070
071  private static final Logger LOG =
072    LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class);
073
074  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
075
076  private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort");
077
078  private static byte[] CF = Bytes.toBytes("cf");
079
080  private static final int REGIONS_NUM = 5;
081
082  private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null);
083
084  private static volatile boolean shouldThrowTooBig = false;
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms
089    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
090    UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
091      CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class);
092    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
093    UTIL.startMiniCluster(option);
094    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
095      .setCoprocessor(SleepWhenCloseCoprocessor.class.getName())
096      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build();
097    UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM);
098  }
099
100  public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl {
101
102    public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority,
103      Configuration conf) {
104      super(maxQueueLength, priority, conf);
105    }
106
107    @Override
108    public boolean offer(CallRunner callRunner) {
109      if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) {
110        return false;
111      }
112      return super.offer(callRunner);
113    }
114  }
115
116  @AfterClass
117  public static void tearDown() throws Exception {
118    UTIL.shutdownMiniCluster();
119  }
120
121  /**
122   * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short
123   * circuits any other logic. This means we no longer even attempt to enqueue the request onto the
124   * call queue. We verify this by using a special call queue which we can trigger to always return
125   * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not
126   * see them.
127   */
128  @Test
129  public void testRejectRequestsOnAbort() throws Exception {
130    // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to
131    // the server. Disrupting meta requests messes with the test.
132    HRegionServer serverWithoutMeta = null;
133    for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
134      .getRegionServerThreads()) {
135      HRegionServer regionServer = regionServerThread.getRegionServer();
136      if (
137        regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty()
138          && !regionServer.getRegions(TABLE_NAME).isEmpty()
139      ) {
140        serverWithoutMeta = regionServer;
141        break;
142      }
143    }
144
145    assertNotNull("couldn't find a server without meta, but with test table regions",
146      serverWithoutMeta);
147
148    Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName()));
149    writer.setDaemon(true);
150    writer.start();
151
152    // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException
153    // and trigger our custom queue to reject any more requests. This would typically result in
154    // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing
155    // of a request is working.
156    serverWithoutMeta.abort("Abort RS for test");
157
158    UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
159    assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class);
160  }
161
162  private Runnable getWriterThreadRunnable(ServerName loadServer) {
163    return () -> {
164      try {
165        Configuration conf = UTIL.getConfiguration();
166        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
167        try (Connection conn = ConnectionFactory.createConnection(conf);
168          Table table = conn.getTableBuilder(TABLE_NAME, null)
169            .setRequestAttribute("test", new byte[] { 0 }).build()) {
170          // find the first region to exist on our test server, then submit requests to it
171          for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) {
172            if (regionLocation.getServerName().equals(loadServer)) {
173              submitRequestsToRegion(table, regionLocation.getRegion());
174              return;
175            }
176          }
177          throw new RuntimeException("Failed to find any regions for loadServer " + loadServer);
178        }
179      } catch (Exception e) {
180        LOG.warn("Failed to load data", e);
181        synchronized (THROWN_EXCEPTION) {
182          THROWN_EXCEPTION.set(e);
183          THROWN_EXCEPTION.notifyAll();
184        }
185      }
186    };
187  }
188
189  private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException {
190    // We will block closes of the regions with a CP, so no need to worry about the region getting
191    // reassigned. Just use the same rowkey always.
192    byte[] rowKey = getRowKeyWithin(regionInfo);
193
194    int i = 0;
195    while (true) {
196      try {
197        i++;
198        table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i)));
199      } catch (IOException e) {
200        // only catch RegionServerAbortedException once. After that, the next exception thrown
201        // is our test case
202        if (
203          !shouldThrowTooBig && e instanceof RetriesExhaustedException
204            && e.getCause() instanceof RegionServerAbortedException
205        ) {
206          shouldThrowTooBig = true;
207        } else {
208          throw e;
209        }
210      }
211
212      // small sleep to relieve pressure
213      Threads.sleep(10);
214    }
215  }
216
217  private byte[] getRowKeyWithin(RegionInfo regionInfo) {
218    byte[] rowKey;
219    // region is start of table, find one after start key
220    if (regionInfo.getStartKey().length == 0) {
221      if (regionInfo.getEndKey().length == 0) {
222        // doesn't matter, single region table
223        return Bytes.toBytes(1);
224      } else {
225        // find a row just before endkey
226        rowKey = Bytes.copy(regionInfo.getEndKey());
227        rowKey[rowKey.length - 1]--;
228        return rowKey;
229      }
230    } else {
231      return regionInfo.getStartKey();
232    }
233  }
234
235  public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver {
236
237    public SleepWhenCloseCoprocessor() {
238    }
239
240    @Override
241    public Optional<RegionObserver> getRegionObserver() {
242      return Optional.of(this);
243    }
244
245    @Override
246    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
247      throws IOException {
248      // Wait so that the region can't close until we get the information we need from our test
249      UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
250    }
251  }
252}