001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CallQueueTooBigException; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MultiActionResultTooLarge; 038import org.apache.hadoop.hbase.NotServingRegionException; 039import org.apache.hadoop.hbase.RegionTooBusyException; 040import org.apache.hadoop.hbase.RetryImmediatelyException; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 043import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 044import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 045import org.apache.hadoop.hbase.regionserver.HRegionServer; 046import org.apache.hadoop.hbase.regionserver.RSRpcServices; 047import org.apache.hadoop.hbase.testclassification.ClientTests; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.function.ThrowingRunnable; 057 058import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 060import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 065 066@Category({ MediumTests.class, ClientTests.class }) 067public class TestMetaCache { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestMetaCache.class); 072 073 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 075 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 076 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 077 078 private static HRegionServer badRS; 079 080 private Connection conn; 081 private MetricsConnection metrics; 082 private AsyncRegionLocator locator; 083 084 @BeforeClass 085 public static void setUpBeforeClass() throws Exception { 086 Configuration conf = TEST_UTIL.getConfiguration(); 087 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName()); 088 TEST_UTIL.startMiniCluster(1); 089 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 090 TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 091 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 092 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 093 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 094 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build()) 095 .build(); 096 TEST_UTIL.createTable(desc, null); 097 } 098 099 @AfterClass 100 public static void tearDownAfterClass() throws Exception { 101 TEST_UTIL.shutdownMiniCluster(); 102 } 103 104 @After 105 public void tearDown() throws IOException { 106 Closeables.close(conn, true); 107 } 108 109 private void setupConnection(int retry) throws IOException { 110 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 111 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry); 112 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 113 conn = ConnectionFactory.createConnection(conf); 114 AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection(); 115 locator = asyncConn.getLocator(); 116 metrics = asyncConn.getConnectionMetrics().get(); 117 } 118 119 /** 120 * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally 121 * encountered when using floorEntry rather than lowerEntry. 122 */ 123 @Test 124 public void testAddToCacheReverse() throws IOException, InterruptedException { 125 setupConnection(1); 126 TableName tableName = TableName.valueOf("testAddToCache"); 127 byte[] family = Bytes.toBytes("CF"); 128 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 129 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 130 int maxSplits = 10; 131 List<byte[]> splits = 132 IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList()); 133 134 TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][])); 135 TEST_UTIL.waitTableAvailable(tableName); 136 TEST_UTIL.waitUntilNoRegionsInTransition(); 137 138 assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 139 140 RegionLocator locatorForTable = conn.getRegionLocator(tableName); 141 for (int i = maxSplits; i >= 0; i--) { 142 locatorForTable.getRegionLocation(Bytes.toBytes(i)); 143 } 144 145 for (int i = 0; i < maxSplits; i++) { 146 assertNotNull(locator.getRegionLocationInCache(tableName, Bytes.toBytes(i))); 147 } 148 } 149 150 @Test 151 public void testMergeEmptyWithMetaCache() throws Throwable { 152 TableName tableName = TableName.valueOf("testMergeEmptyWithMetaCache"); 153 byte[] family = Bytes.toBytes("CF"); 154 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 155 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 156 TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) }); 157 TEST_UTIL.waitTableAvailable(tableName); 158 TEST_UTIL.waitUntilNoRegionsInTransition(); 159 RegionInfo regionA = null; 160 RegionInfo regionB = null; 161 RegionInfo regionC = null; 162 for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) { 163 if (region.getStartKey().length == 0) { 164 regionA = region; 165 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) { 166 regionB = region; 167 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) { 168 regionC = region; 169 } 170 } 171 172 assertNotNull(regionA); 173 assertNotNull(regionB); 174 assertNotNull(regionC); 175 176 TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, 177 true); 178 try (AsyncConnection asyncConn = 179 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 180 AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn; 181 182 MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get(); 183 184 // warm meta cache 185 asyncConn.getRegionLocator(tableName).getAllRegionLocations().get(); 186 187 assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size()); 188 189 // Merge the 3 regions into one 190 TEST_UTIL.getAdmin().mergeRegionsAsync( 191 new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() }, 192 false).get(30, TimeUnit.SECONDS); 193 194 assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 195 196 AsyncTable<?> asyncTable = asyncConn.getTable(tableName); 197 198 // This request should cause us to cache the newly merged region. 199 // As part of caching that region, it should clear out any cached merge parent regions which 200 // are overlapped by the new region. That way, subsequent calls below won't fall into the 201 // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached 202 // regionB and we'd continue to see cache misses below. 203 assertTrue( 204 executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics) 205 > 0); 206 207 // We verify no new cache misses here due to above, which proves we've fixed up the cache 208 assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), 209 asyncMetrics)); 210 } 211 } 212 213 private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics) 214 throws Throwable { 215 long lastVal = metrics.getMetaCacheMisses(); 216 runnable.run(); 217 long curVal = metrics.getMetaCacheMisses(); 218 return curVal - lastVal; 219 } 220 221 @Test 222 public void testPreserveMetaCacheOnException() throws Exception { 223 ((FakeRSRpcServices) badRS.getRSRpcServices()) 224 .setExceptionInjector(new RoundRobinExceptionInjector()); 225 setupConnection(1); 226 try (Table table = conn.getTable(TABLE_NAME)) { 227 byte[] row = Bytes.toBytes("row1"); 228 229 Put put = new Put(row); 230 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 231 Get get = new Get(row); 232 Append append = new Append(row); 233 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 234 Increment increment = new Increment(row); 235 increment.addColumn(FAMILY, QUALIFIER, 10); 236 Delete delete = new Delete(row); 237 delete.addColumn(FAMILY, QUALIFIER); 238 RowMutations mutations = new RowMutations(row); 239 mutations.add(put); 240 mutations.add(delete); 241 242 Exception exp; 243 boolean success; 244 for (int i = 0; i < 50; i++) { 245 exp = null; 246 success = false; 247 try { 248 table.put(put); 249 // If at least one operation succeeded, we should have cached the region location. 250 success = true; 251 table.get(get); 252 table.append(append); 253 table.increment(increment); 254 table.delete(delete); 255 table.mutateRow(mutations); 256 } catch (IOException ex) { 257 // Only keep track of the last exception that updated the meta cache 258 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 259 exp = ex; 260 } 261 } 262 // Do not test if we did not touch the meta cache in this iteration. 263 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 264 assertNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 265 } else if (success) { 266 assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 267 } 268 } 269 } 270 } 271 272 @Test 273 public void testCacheClearingOnCallQueueTooBig() throws Exception { 274 ((FakeRSRpcServices) badRS.getRSRpcServices()) 275 .setExceptionInjector(new CallQueueTooBigExceptionInjector()); 276 setupConnection(2); 277 Table table = conn.getTable(TABLE_NAME); 278 byte[] row = Bytes.toBytes("row1"); 279 280 Put put = new Put(row); 281 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 282 table.put(put); 283 284 // obtain the client metrics 285 long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 286 long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 287 288 // attempt a get on the test table 289 Get get = new Get(row); 290 try { 291 table.get(get); 292 fail("Expected CallQueueTooBigException"); 293 } catch (RetriesExhaustedException ree) { 294 // expected 295 } 296 297 // verify that no cache clearing took place 298 long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 299 long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 300 assertEquals(preGetRegionClears, postGetRegionClears); 301 assertEquals(preGetServerClears, postGetServerClears); 302 } 303 304 public static List<Throwable> metaCachePreservingExceptions() { 305 return Arrays.asList(new RegionOpeningException(" "), 306 new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "), 307 new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "), 308 new CallQueueTooBigException()); 309 } 310 311 public static class RegionServerWithFakeRpcServices extends HRegionServer { 312 private FakeRSRpcServices rsRpcServices; 313 314 public RegionServerWithFakeRpcServices(Configuration conf) 315 throws IOException, InterruptedException { 316 super(conf); 317 } 318 319 @Override 320 protected RSRpcServices createRpcServices() throws IOException { 321 this.rsRpcServices = new FakeRSRpcServices(this); 322 return rsRpcServices; 323 } 324 325 public void setExceptionInjector(ExceptionInjector injector) { 326 rsRpcServices.setExceptionInjector(injector); 327 } 328 } 329 330 public static class FakeRSRpcServices extends RSRpcServices { 331 332 private ExceptionInjector exceptions; 333 334 public FakeRSRpcServices(HRegionServer rs) throws IOException { 335 super(rs); 336 exceptions = new RoundRobinExceptionInjector(); 337 } 338 339 public void setExceptionInjector(ExceptionInjector injector) { 340 this.exceptions = injector; 341 } 342 343 @Override 344 public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request) 345 throws ServiceException { 346 exceptions.throwOnGet(this, request); 347 return super.get(controller, request); 348 } 349 350 @Override 351 public ClientProtos.MutateResponse mutate(final RpcController controller, 352 final ClientProtos.MutateRequest request) throws ServiceException { 353 exceptions.throwOnMutate(this, request); 354 return super.mutate(controller, request); 355 } 356 357 @Override 358 public ClientProtos.ScanResponse scan(final RpcController controller, 359 final ClientProtos.ScanRequest request) throws ServiceException { 360 exceptions.throwOnScan(this, request); 361 return super.scan(controller, request); 362 } 363 } 364 365 public static abstract class ExceptionInjector { 366 protected boolean isTestTable(FakeRSRpcServices rpcServices, 367 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 368 try { 369 return TABLE_NAME 370 .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 371 } catch (IOException ioe) { 372 throw new ServiceException(ioe); 373 } 374 } 375 376 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 377 throws ServiceException; 378 379 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, 380 ClientProtos.MutateRequest request) throws ServiceException; 381 382 public abstract void throwOnScan(FakeRSRpcServices rpcServices, 383 ClientProtos.ScanRequest request) throws ServiceException; 384 } 385 386 /** 387 * Rotates through the possible cache clearing and non-cache clearing exceptions for requests. 388 */ 389 public static class RoundRobinExceptionInjector extends ExceptionInjector { 390 private int numReqs = -1; 391 private int expCount = -1; 392 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 393 394 @Override 395 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 396 throws ServiceException { 397 throwSomeExceptions(rpcServices, request.getRegion()); 398 } 399 400 @Override 401 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 402 throws ServiceException { 403 throwSomeExceptions(rpcServices, request.getRegion()); 404 } 405 406 @Override 407 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 408 throws ServiceException { 409 if (!request.hasScannerId()) { 410 // only handle initial scan requests 411 throwSomeExceptions(rpcServices, request.getRegion()); 412 } 413 } 414 415 /** 416 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically 417 * throw NotSevingRegionException which clears the meta cache. 418 */ 419 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 420 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 421 if (!isTestTable(rpcServices, regionSpec)) { 422 return; 423 } 424 425 numReqs++; 426 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 427 // meta cache preserving exceptions otherwise. 428 if (numReqs % 5 == 0) { 429 return; 430 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 431 throw new ServiceException(new NotServingRegionException()); 432 } 433 // Round robin between different special exceptions. 434 // This is not ideal since exception types are not tied to the operation performed here, 435 // But, we don't really care here if we throw MultiActionTooLargeException while doing 436 // single Gets. 437 expCount++; 438 Throwable t = 439 metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size()); 440 throw new ServiceException(t); 441 } 442 } 443 444 /** 445 * Throws CallQueueTooBigException for all gets. 446 */ 447 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 448 @Override 449 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 450 throws ServiceException { 451 if (isTestTable(rpcServices, request.getRegion())) { 452 throw new ServiceException(new CallQueueTooBigException()); 453 } 454 } 455 456 @Override 457 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 458 throws ServiceException { 459 } 460 461 @Override 462 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 463 throws ServiceException { 464 } 465 } 466}