001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.NotServingRegionException; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.TableNameTestRule; 036import org.apache.hadoop.hbase.Waiter; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.BufferedMutator; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Durability; 043import org.apache.hadoop.hbase.client.Increment; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.coprocessor.ObserverContext; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 055import org.apache.hadoop.hbase.coprocessor.RegionObserver; 056import org.apache.hadoop.hbase.exceptions.DeserializationException; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.testclassification.RegionServerTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.junit.After; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category({ RegionServerTests.class, LargeTests.class }) 074public class TestRegionInterrupt { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestRegionInterrupt.class); 079 080 private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 081 private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class); 082 083 static final byte[] FAMILY = Bytes.toBytes("info"); 084 085 static long sleepTime; 086 087 @Rule 088 public TableNameTestRule name = new TableNameTestRule(); 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 Configuration conf = TEST_UTIL.getConfiguration(); 093 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 094 conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class); 095 conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); 096 // Ensure the sleep interval is long enough for interrupts to occur. 097 long waitInterval = 098 conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, HRegion.DEFAULT_CLOSE_WAIT_INTERVAL); 099 sleepTime = waitInterval * 2; 100 // Try to bound the running time of this unit if expected actions do not take place. 101 conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2); 102 } 103 104 @Before 105 public void setUp() throws Exception { 106 TEST_UTIL.startMiniCluster(); 107 } 108 109 @After 110 public void tearDown() throws Exception { 111 TEST_UTIL.shutdownMiniCluster(); 112 } 113 114 @Test 115 public void testCloseInterruptScanning() throws Exception { 116 final TableName tableName = name.getTableName(); 117 LOG.info("Creating table " + tableName); 118 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 119 // load some data 120 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 121 TEST_UTIL.loadTable(table, FAMILY); 122 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 123 // scan the table in the background 124 Thread scanner = new Thread(new Runnable() { 125 @Override 126 public void run() { 127 Scan scan = new Scan(); 128 scan.addFamily(FAMILY); 129 scan.setFilter(new DelayingFilter()); 130 try { 131 LOG.info("Starting scan"); 132 try (ResultScanner rs = table.getScanner(scan)) { 133 Result r; 134 do { 135 r = rs.next(); 136 if (r != null) { 137 LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow())); 138 } 139 } while (r != null); 140 } 141 } catch (IOException e) { 142 LOG.info("Scanner caught exception", e); 143 expectedExceptionCaught.set(true); 144 } finally { 145 LOG.info("Finished scan"); 146 } 147 } 148 }); 149 scanner.start(); 150 151 // Wait for the filter to begin sleeping 152 LOG.info("Waiting for scanner to start"); 153 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() { 154 @Override 155 public boolean evaluate() throws Exception { 156 return DelayingFilter.isSleeping(); 157 } 158 }); 159 160 // Offline the table, this will trigger closing 161 LOG.info("Offlining table " + tableName); 162 TEST_UTIL.getAdmin().disableTable(tableName); 163 164 // Wait for scanner termination 165 scanner.join(); 166 167 // When we get here the region has closed and the table is offline 168 assertTrue("Region operations were not interrupted", 169 InterruptInterceptingHRegion.wasInterrupted()); 170 assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get()); 171 } 172 } 173 174 @Test 175 public void testCloseInterruptMutation() throws Exception { 176 final TableName tableName = name.getTableName(); 177 final Admin admin = TEST_UTIL.getAdmin(); 178 // Create the test table 179 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 180 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 181 .setCoprocessor(MutationDelayingCoprocessor.class.getName()).build(); 182 LOG.info("Creating table " + tableName); 183 admin.createTable(htd); 184 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 185 186 // Insert some data in the background 187 LOG.info("Starting writes to table " + tableName); 188 final int NUM_ROWS = 100; 189 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 190 Thread inserter = new Thread(new Runnable() { 191 @Override 192 public void run() { 193 try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) { 194 for (int i = 0; i < NUM_ROWS; i++) { 195 LOG.info("Writing row " + i + " to " + tableName); 196 byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i)); 197 Bytes.random(value); 198 t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value)); 199 t.flush(); 200 } 201 } catch (IOException e) { 202 LOG.info("Inserter caught exception", e); 203 expectedExceptionCaught.set(true); 204 } 205 } 206 }); 207 inserter.start(); 208 209 // Wait for delayed insertion to begin 210 LOG.info("Waiting for mutations to start"); 211 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() { 212 @Override 213 public boolean evaluate() throws Exception { 214 return MutationDelayingCoprocessor.isSleeping(); 215 } 216 }); 217 218 // Offline the table, this will trigger closing 219 LOG.info("Offlining table " + tableName); 220 admin.disableTable(tableName); 221 222 // Wait for the inserter to finish 223 inserter.join(); 224 225 // When we get here the region has closed and the table is offline 226 assertTrue("Region operations were not interrupted", 227 InterruptInterceptingHRegion.wasInterrupted()); 228 assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get()); 229 230 } 231 232 public static class InterruptInterceptingHRegion extends HRegion { 233 234 private static boolean interrupted = false; 235 236 public static boolean wasInterrupted() { 237 return interrupted; 238 } 239 240 public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration conf, 241 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 242 super(tableDir, wal, fs, conf, regionInfo, htd, rsServices); 243 } 244 245 public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf, 246 TableDescriptor htd, RegionServerServices rsServices) { 247 super(fs, wal, conf, htd, rsServices); 248 } 249 250 @Override 251 void checkInterrupt() throws NotServingRegionException, InterruptedIOException { 252 try { 253 super.checkInterrupt(); 254 } catch (NotServingRegionException | InterruptedIOException e) { 255 interrupted = true; 256 throw e; 257 } 258 } 259 260 @Override 261 IOException throwOnInterrupt(Throwable t) { 262 interrupted = true; 263 return super.throwOnInterrupt(t); 264 } 265 266 } 267 268 public static class DelayingFilter extends FilterBase { 269 270 static volatile boolean sleeping = false; 271 272 public static boolean isSleeping() { 273 return sleeping; 274 } 275 276 @Override 277 public ReturnCode filterCell(Cell v) throws IOException { 278 LOG.info("Starting sleep on " + v); 279 sleeping = true; 280 try { 281 Thread.sleep(sleepTime); 282 } catch (InterruptedException e) { 283 // restore interrupt status so region scanner can handle it as expected 284 Thread.currentThread().interrupt(); 285 LOG.info("Interrupted during sleep on " + v); 286 } finally { 287 LOG.info("Done sleep on " + v); 288 sleeping = false; 289 } 290 return ReturnCode.INCLUDE; 291 } 292 293 public static DelayingFilter parseFrom(final byte[] pbBytes) throws DeserializationException { 294 // Just return a new instance. 295 return new DelayingFilter(); 296 } 297 298 } 299 300 public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver { 301 302 static volatile boolean sleeping = false; 303 304 public static boolean isSleeping() { 305 return sleeping; 306 } 307 308 private void doSleep(Region.Operation op) { 309 LOG.info("Starting sleep for " + op); 310 sleeping = true; 311 try { 312 Thread.sleep(sleepTime); 313 } catch (InterruptedException e) { 314 // restore interrupt status so doMiniBatchMutation etc. can handle it as expected 315 Thread.currentThread().interrupt(); 316 LOG.info("Interrupted during " + op); 317 } finally { 318 LOG.info("Done"); 319 sleeping = false; 320 } 321 } 322 323 @Override 324 public Optional<RegionObserver> getRegionObserver() { 325 return Optional.of(this); 326 } 327 328 @Override 329 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 330 WALEdit edit, Durability durability) throws IOException { 331 doSleep(Region.Operation.PUT); 332 RegionObserver.super.prePut(c, put, edit, durability); 333 } 334 335 @Override 336 public void preDelete(ObserverContext<? extends RegionCoprocessorEnvironment> c, Delete delete, 337 WALEdit edit, Durability durability) throws IOException { 338 doSleep(Region.Operation.DELETE); 339 RegionObserver.super.preDelete(c, delete, edit, durability); 340 } 341 342 @Override 343 public Result preAppend(ObserverContext<? extends RegionCoprocessorEnvironment> c, 344 Append append) throws IOException { 345 doSleep(Region.Operation.APPEND); 346 return RegionObserver.super.preAppend(c, append); 347 } 348 349 @Override 350 public Result preIncrement(ObserverContext<? extends RegionCoprocessorEnvironment> c, 351 Increment increment) throws IOException { 352 doSleep(Region.Operation.INCREMENT); 353 return RegionObserver.super.preIncrement(c, increment); 354 } 355 356 } 357 358}