001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.net.SocketTimeoutException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Random; 030import java.util.SortedMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.CellComparatorImpl; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MetaCellComparator; 049import org.apache.hadoop.hbase.MetaTableAccessor; 050import org.apache.hadoop.hbase.RegionLocations; 051import org.apache.hadoop.hbase.RegionTooBusyException; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.SmallTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.util.Threads; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.junit.Before; 065import org.junit.ClassRule; 066import org.junit.Ignore; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 077import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 103 104/** 105 * Test client behavior w/o setting up a cluster. Mock up cluster emissions. See below for a method 106 * that tests retries/timeouts currently commented out. 107 */ 108@Category({ ClientTests.class, SmallTests.class }) 109public class TestClientNoCluster extends Configured implements Tool { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestClientNoCluster.class); 114 115 private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class); 116 private Configuration conf; 117 /** 118 * A server that does not exist. I've changed the server in the below to 'localhost' so we have a 119 * servername that resolves -- otherwise, we just fail on server name lookup with UnknownHost... 120 * With localhost, was able to reproduce stack traces that looked like production stack traces. 121 * Was useful figuring out how retry/timeouts are functioning. 122 */ 123 public static final ServerName META_SERVERNAME = 124 ServerName.valueOf("meta.example.org", 16010, 12345); 125 126 @Before 127 public void setUp() throws Exception { 128 this.conf = HBaseConfiguration.create(); 129 // Run my Connection overrides. Use my little ConnectionImplementation below which 130 // allows me insert mocks and also use my Registry below rather than the default zk based 131 // one so tests run faster and don't have zk dependency. 132 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 133 } 134 135 /** 136 * Simple cluster registry inserted in place of our usual zookeeper based one. 137 */ 138 static class SimpleRegistry extends DoNothingConnectionRegistry { 139 final ServerName META_HOST = META_SERVERNAME; 140 141 public SimpleRegistry(Configuration conf, User user) { 142 super(conf, user); 143 } 144 145 @Override 146 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 147 return CompletableFuture.completedFuture(new RegionLocations( 148 new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); 149 } 150 151 @Override 152 public CompletableFuture<String> getClusterId() { 153 return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); 154 } 155 } 156 157 /** 158 * Remove the @Ignore to try out timeout and retry settings 159 */ 160 @Ignore 161 @Test 162 public void testTimeoutAndRetries() throws IOException { 163 Configuration localConfig = HBaseConfiguration.create(this.conf); 164 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 165 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 166 // localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 7); 167 Connection connection = ConnectionFactory.createConnection(localConfig); 168 Table table = connection.getTable(TableName.META_TABLE_NAME); 169 Throwable t = null; 170 LOG.info("Start"); 171 try { 172 // An exists call turns into a get w/ a flag. 173 table.exists(new Get(Bytes.toBytes("abc"))); 174 } catch (SocketTimeoutException e) { 175 // I expect this exception. 176 LOG.info("Got expected exception", e); 177 t = e; 178 } finally { 179 table.close(); 180 } 181 connection.close(); 182 LOG.info("Stop"); 183 assertTrue(t != null); 184 } 185 186 /** 187 * Remove the @Ignore to try out timeout and retry settings 188 */ 189 // @Ignore 190 @Test 191 public void testAsyncTimeoutAndRetries() 192 throws IOException, ExecutionException, InterruptedException { 193 Configuration localConfig = HBaseConfiguration.create(this.conf); 194 localConfig.set(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 195 RpcTimeoutAsyncConnection.class.getName()); 196 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 9); 197 AsyncConnection connection = ConnectionFactory.createAsyncConnection(localConfig).get(); 198 AsyncTable table = connection.getTable(TableName.META_TABLE_NAME); 199 Throwable t = null; 200 LOG.info("Start"); 201 try { 202 // An exists call turns into a get w/ a flag. 203 table.exists(new Get(Bytes.toBytes("abc"))).get(); 204 } catch (Throwable throwable) { 205 // What to catch? 206 t = throwable; 207 } finally { 208 connection.close(); 209 } 210 LOG.info("Stop"); 211 assertTrue(t != null); 212 } 213 214 /** 215 * Test that operation timeout prevails over rpc default timeout and retries, etc. 216 */ 217 @Test 218 public void testRpcTimeout() throws IOException { 219 Configuration localConfig = HBaseConfiguration.create(this.conf); 220 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 221 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 222 int pause = 10; 223 localConfig.setInt("hbase.client.pause", pause); 224 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); 225 // Set the operation timeout to be < the pause. Expectation is that after first pause, we will 226 // fail out of the rpc because the rpc timeout will have been set to the operation tiemout 227 // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- 228 // all ten of them -- and we'll get the RetriesExhaustedException exception. 229 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); 230 Connection connection = ConnectionFactory.createConnection(localConfig); 231 Table table = connection.getTable(TableName.META_TABLE_NAME); 232 Throwable t = null; 233 try { 234 // An exists call turns into a get w/ a flag. 235 table.exists(new Get(Bytes.toBytes("abc"))); 236 } catch (SocketTimeoutException e) { 237 // I expect this exception. 238 LOG.info("Got expected exception", e); 239 t = e; 240 } finally { 241 table.close(); 242 connection.close(); 243 } 244 assertTrue(t != null); 245 } 246 247 @Test 248 public void testDoNotRetryMetaTableAccessor() throws IOException { 249 this.conf.set("hbase.client.connection.impl", 250 RegionServerStoppedOnScannerOpenConnection.class.getName()); 251 try (Connection connection = ConnectionFactory.createConnection(conf)) { 252 MetaTableAccessor.fullScanRegions(connection); 253 } 254 } 255 256 @Test 257 public void testDoNotRetryOnScanNext() throws IOException { 258 this.conf.set("hbase.client.connection.impl", 259 RegionServerStoppedOnScannerOpenConnection.class.getName()); 260 // Go against meta else we will try to find first region for the table on construction which 261 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 262 // good for a bit of testing. 263 Connection connection = ConnectionFactory.createConnection(this.conf); 264 Table table = connection.getTable(TableName.META_TABLE_NAME); 265 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 266 try { 267 Result result = null; 268 while ((result = scanner.next()) != null) { 269 LOG.info(Objects.toString(result)); 270 } 271 } finally { 272 scanner.close(); 273 table.close(); 274 connection.close(); 275 } 276 } 277 278 @Test 279 public void testRegionServerStoppedOnScannerOpen() throws IOException { 280 this.conf.set("hbase.client.connection.impl", 281 RegionServerStoppedOnScannerOpenConnection.class.getName()); 282 // Go against meta else we will try to find first region for the table on construction which 283 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 284 // good for a bit of testing. 285 Connection connection = ConnectionFactory.createConnection(conf); 286 Table table = connection.getTable(TableName.META_TABLE_NAME); 287 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 288 try { 289 Result result = null; 290 while ((result = scanner.next()) != null) { 291 LOG.info(Objects.toString(result)); 292 } 293 } finally { 294 scanner.close(); 295 table.close(); 296 connection.close(); 297 } 298 } 299 300 @Test 301 public void testConnectionClosedOnRegionLocate() throws IOException { 302 Configuration testConf = new Configuration(this.conf); 303 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 304 // Go against meta else we will try to find first region for the table on construction which 305 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 306 // good for a bit of testing. 307 Connection connection = ConnectionFactory.createConnection(testConf); 308 Table table = connection.getTable(TableName.META_TABLE_NAME); 309 connection.close(); 310 try { 311 Get get = new Get(Bytes.toBytes("dummyRow")); 312 table.get(get); 313 fail("Should have thrown DoNotRetryException but no exception thrown"); 314 } catch (Exception e) { 315 if (!(e instanceof DoNotRetryIOException)) { 316 String errMsg = 317 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName(); 318 LOG.error(errMsg, e); 319 fail(errMsg); 320 } 321 } finally { 322 table.close(); 323 } 324 } 325 326 /** 327 * Override to shutdown going to zookeeper for cluster id and meta location. 328 */ 329 static class RegionServerStoppedOnScannerOpenConnection extends ConnectionImplementation { 330 final ClientService.BlockingInterface stub; 331 332 RegionServerStoppedOnScannerOpenConnection(Configuration conf, ExecutorService pool, User user, 333 Map<String, byte[]> requestAttributes) throws IOException { 334 super(conf, pool, user, requestAttributes); 335 // Mock up my stub so open scanner returns a scanner id and then on next, we throw 336 // exceptions for three times and then after that, we return no more to scan. 337 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 338 long sid = 12345L; 339 try { 340 Mockito 341 .when(stub.scan((RpcController) Mockito.any(), (ClientProtos.ScanRequest) Mockito.any())) 342 .thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()) 343 .thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))) 344 .thenReturn( 345 ClientProtos.ScanResponse.newBuilder().setScannerId(sid).setMoreResults(false).build()); 346 } catch (ServiceException e) { 347 throw new IOException(e); 348 } 349 } 350 351 @Override 352 public BlockingInterface getClient(ServerName sn) throws IOException { 353 return this.stub; 354 } 355 } 356 357 /** 358 * Override to check we are setting rpc timeout right. 359 */ 360 static class RpcTimeoutConnection extends ConnectionImplementation { 361 final ClientService.BlockingInterface stub; 362 363 RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user, 364 Map<String, byte[]> requestAttributes) throws IOException { 365 super(conf, pool, user, requestAttributes); 366 // Mock up my stub so an exists call -- which turns into a get -- throws an exception 367 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 368 try { 369 Mockito 370 .when(stub.get((RpcController) Mockito.any(), (ClientProtos.GetRequest) Mockito.any())) 371 .thenThrow(new ServiceException(new java.net.ConnectException("Connection refused"))); 372 } catch (ServiceException e) { 373 throw new IOException(e); 374 } 375 } 376 377 @Override 378 public BlockingInterface getClient(ServerName sn) throws IOException { 379 return this.stub; 380 } 381 } 382 383 /** 384 * Override to check we are setting rpc timeout right. 385 */ 386 static class RpcTimeoutAsyncConnection extends AsyncConnectionImpl { 387 RpcTimeoutAsyncConnection(Configuration configuration, ConnectionRegistry registry, 388 String clusterId, User user, Map<String, byte[]> connectionAttributes) { 389 super(configuration, registry, clusterId, user, connectionAttributes); 390 } 391 } 392 393 /** 394 * Fake many regionservers and many regions on a connection implementation. 395 */ 396 static class ManyServersManyRegionsConnection extends ConnectionImplementation { 397 // All access should be synchronized 398 final Map<ServerName, ClientService.BlockingInterface> serversByClient; 399 400 /** 401 * Map of faked-up rows of a 'meta table'. 402 */ 403 final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta; 404 final AtomicLong sequenceids = new AtomicLong(0); 405 private final Configuration conf; 406 407 ManyServersManyRegionsConnection(Configuration conf, ExecutorService pool, User user, 408 Map<String, byte[]> requestAttributes) throws IOException { 409 super(conf, pool, user, requestAttributes); 410 int serverCount = conf.getInt("hbase.test.servers", 10); 411 this.serversByClient = new HashMap<>(serverCount); 412 this.meta = 413 makeMeta(Bytes.toBytes(conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), 414 conf.getInt("hbase.test.regions", 100), conf.getLong("hbase.test.namespace.span", 1000), 415 serverCount); 416 this.conf = conf; 417 } 418 419 @Override 420 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 421 // if (!sn.toString().startsWith("meta")) LOG.info(sn); 422 ClientService.BlockingInterface stub = null; 423 synchronized (this.serversByClient) { 424 stub = this.serversByClient.get(sn); 425 if (stub == null) { 426 stub = new FakeServer(this.conf, meta, sequenceids); 427 this.serversByClient.put(sn, stub); 428 } 429 } 430 return stub; 431 } 432 } 433 434 static MultiResponse doMultiResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 435 final AtomicLong sequenceids, final MultiRequest request) { 436 // Make a response to match the request. Act like there were no failures. 437 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 438 // Per Region. 439 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 440 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 441 for (RegionAction regionAction : request.getRegionActionList()) { 442 regionActionResultBuilder.clear(); 443 // Per Action in a Region. 444 for (ClientProtos.Action action : regionAction.getActionList()) { 445 roeBuilder.clear(); 446 // Return empty Result and proper index as result. 447 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 448 roeBuilder.setIndex(action.getIndex()); 449 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 450 } 451 builder.addRegionActionResult(regionActionResultBuilder.build()); 452 } 453 return builder.build(); 454 } 455 456 /** 457 * Fake 'server'. Implements the ClientService responding as though it were a 'server' (presumes a 458 * new ClientService.BlockingInterface made per server). 459 */ 460 static class FakeServer implements ClientService.BlockingInterface { 461 private AtomicInteger multiInvocationsCount = new AtomicInteger(0); 462 private final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta; 463 private final AtomicLong sequenceids; 464 private final long multiPause; 465 private final int tooManyMultiRequests; 466 467 FakeServer(final Configuration c, final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 468 final AtomicLong sequenceids) { 469 this.meta = meta; 470 this.sequenceids = sequenceids; 471 472 // Pause to simulate the server taking time applying the edits. This will drive up the 473 // number of threads used over in client. 474 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); 475 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); 476 } 477 478 @Override 479 public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { 480 boolean metaRegion = 481 isMetaRegion(request.getRegion().getValue().toByteArray(), request.getRegion().getType()); 482 if (!metaRegion) { 483 return doGetResponse(request); 484 } 485 return doMetaGetResponse(meta, request); 486 } 487 488 private GetResponse doGetResponse(GetRequest request) { 489 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 490 ByteString row = request.getGet().getRow(); 491 resultBuilder.addCell(getStartCode(row)); 492 GetResponse.Builder builder = GetResponse.newBuilder(); 493 builder.setResult(resultBuilder.build()); 494 return builder.build(); 495 } 496 497 @Override 498 public MutateResponse mutate(RpcController controller, MutateRequest request) 499 throws ServiceException { 500 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 501 } 502 503 @Override 504 public ScanResponse scan(RpcController controller, ScanRequest request) 505 throws ServiceException { 506 // Presume it is a scan of meta for now. Not all scans provide a region spec expecting 507 // the server to keep reference by scannerid. TODO. 508 return doMetaScanResponse(meta, sequenceids, request); 509 } 510 511 @Override 512 public BulkLoadHFileResponse bulkLoadHFile(RpcController controller, 513 BulkLoadHFileRequest request) throws ServiceException { 514 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 515 } 516 517 @Override 518 public CoprocessorServiceResponse execService(RpcController controller, 519 CoprocessorServiceRequest request) throws ServiceException { 520 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 521 } 522 523 @Override 524 public MultiResponse multi(RpcController controller, MultiRequest request) 525 throws ServiceException { 526 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); 527 try { 528 if (concurrentInvocations >= tooManyMultiRequests) { 529 throw new ServiceException( 530 new RegionTooBusyException("concurrentInvocations=" + concurrentInvocations)); 531 } 532 Threads.sleep(multiPause); 533 return doMultiResponse(meta, sequenceids, request); 534 } finally { 535 this.multiInvocationsCount.decrementAndGet(); 536 } 537 } 538 539 @Override 540 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 541 CoprocessorServiceRequest request) throws ServiceException { 542 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 543 } 544 545 @Override 546 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 547 PrepareBulkLoadRequest request) throws ServiceException { 548 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 549 } 550 551 @Override 552 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 553 CleanupBulkLoadRequest request) throws ServiceException { 554 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 555 } 556 } 557 558 static ScanResponse doMetaScanResponse( 559 final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, final AtomicLong sequenceids, 560 final ScanRequest request) { 561 ScanResponse.Builder builder = ScanResponse.newBuilder(); 562 int max = request.getNumberOfRows(); 563 int count = 0; 564 Map<byte[], Pair<HRegionInfo, ServerName>> tail = 565 request.hasScan() ? meta.tailMap(request.getScan().getStartRow().toByteArray()) : meta; 566 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 567 for (Map.Entry<byte[], Pair<HRegionInfo, ServerName>> e : tail.entrySet()) { 568 // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. 569 if (max <= 0) break; 570 if (++count > max) break; 571 HRegionInfo hri = e.getValue().getFirst(); 572 ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName()); 573 resultBuilder.clear(); 574 resultBuilder.addCell(getRegionInfo(row, hri)); 575 resultBuilder.addCell(getServer(row, e.getValue().getSecond())); 576 resultBuilder.addCell(getStartCode(row)); 577 builder.addResults(resultBuilder.build()); 578 // Set more to false if we are on the last region in table. 579 if (hri.getEndKey().length <= 0) builder.setMoreResults(false); 580 else builder.setMoreResults(true); 581 } 582 // If no scannerid, set one. 583 builder.setScannerId( 584 request.hasScannerId() ? request.getScannerId() : sequenceids.incrementAndGet()); 585 return builder.build(); 586 } 587 588 static GetResponse doMetaGetResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 589 final GetRequest request) { 590 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 591 ByteString row = request.getGet().getRow(); 592 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray()); 593 if (p != null) { 594 resultBuilder.addCell(getRegionInfo(row, p.getFirst())); 595 resultBuilder.addCell(getServer(row, p.getSecond())); 596 } 597 resultBuilder.addCell(getStartCode(row)); 598 GetResponse.Builder builder = GetResponse.newBuilder(); 599 builder.setResult(resultBuilder.build()); 600 return builder.build(); 601 } 602 603 /** 604 * @param name region name or encoded region name. 605 * @return True if we are dealing with a hbase:meta region. 606 */ 607 static boolean isMetaRegion(final byte[] name, final RegionSpecifierType type) { 608 switch (type) { 609 case REGION_NAME: 610 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); 611 case ENCODED_REGION_NAME: 612 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); 613 default: 614 throw new UnsupportedOperationException(); 615 } 616 } 617 618 private final static ByteString CATALOG_FAMILY_BYTESTRING = 619 UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY); 620 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = 621 UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER); 622 private final static ByteString SERVER_QUALIFIER_BYTESTRING = 623 UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER); 624 625 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { 626 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); 627 cellBuilder.setRow(row); 628 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); 629 cellBuilder.setTimestamp(EnvironmentEdgeManager.currentTime()); 630 return cellBuilder; 631 } 632 633 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { 634 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 635 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); 636 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray())); 637 return cellBuilder.build(); 638 } 639 640 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { 641 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 642 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); 643 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getAddress().toString())); 644 return cellBuilder.build(); 645 } 646 647 static CellProtos.Cell getStartCode(final ByteString row) { 648 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 649 cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER)); 650 // TODO: 651 cellBuilder 652 .setValue(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(META_SERVERNAME.getStartcode()))); 653 return cellBuilder.build(); 654 } 655 656 private static final byte[] BIG_USER_TABLE = Bytes.toBytes("t"); 657 658 /** 659 * Format passed integer. Zero-pad. Copied from hbase-server PE class and small amendment. Make 660 * them share. 661 * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in 662 * case number is negative). 663 */ 664 private static byte[] format(final long number) { 665 byte[] b = new byte[10]; 666 long d = number; 667 for (int i = b.length - 1; i >= 0; i--) { 668 b[i] = (byte) ((d % 10) + '0'); 669 d /= 10; 670 } 671 return b; 672 } 673 674 /** Returns <code>count</code> regions */ 675 private static HRegionInfo[] makeHRegionInfos(final byte[] tableName, final int count, 676 final long namespaceSpan) { 677 byte[] startKey = HConstants.EMPTY_BYTE_ARRAY; 678 byte[] endKey = HConstants.EMPTY_BYTE_ARRAY; 679 long interval = namespaceSpan / count; 680 HRegionInfo[] hris = new HRegionInfo[count]; 681 for (int i = 0; i < count; i++) { 682 if (i == 0) { 683 endKey = format(interval); 684 } else { 685 startKey = endKey; 686 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; 687 else endKey = format((i + 1) * interval); 688 } 689 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); 690 } 691 return hris; 692 } 693 694 /** Returns Return <code>count</code> servernames. */ 695 private static ServerName[] makeServerNames(final int count) { 696 ServerName[] sns = new ServerName[count]; 697 for (int i = 0; i < count; i++) { 698 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i); 699 } 700 return sns; 701 } 702 703 /** 704 * Comparator for meta row keys. 705 */ 706 private static class MetaRowsComparator implements Comparator<byte[]> { 707 private final CellComparatorImpl delegate = MetaCellComparator.META_COMPARATOR; 708 709 @Override 710 public int compare(byte[] left, byte[] right) { 711 return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length); 712 } 713 } 714 715 /** 716 * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and 717 * ServerName to return for this row. 718 * @return Map with faked hbase:meta content in it. 719 */ 720 static SortedMap<byte[], Pair<HRegionInfo, ServerName>> makeMeta(final byte[] tableName, 721 final int regionCount, final long namespaceSpan, final int serverCount) { 722 // I need a comparator for meta rows so we sort properly. 723 SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta = 724 new ConcurrentSkipListMap<>(new MetaRowsComparator()); 725 HRegionInfo[] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); 726 ServerName[] serverNames = makeServerNames(serverCount); 727 int per = regionCount / serverCount; 728 int count = 0; 729 for (HRegionInfo hri : hris) { 730 Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]); 731 meta.put(hri.getRegionName(), p); 732 } 733 return meta; 734 } 735 736 /** 737 * Code for each 'client' to run. 738 */ 739 static void cycle(int id, final Configuration c, final Connection sharedConnection) 740 throws IOException { 741 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); 742 long startTime = EnvironmentEdgeManager.currentTime(); 743 final int printInterval = 100000; 744 Random rd = new Random(id); 745 boolean get = c.getBoolean("hbase.test.do.gets", false); 746 TableName tableName = TableName.valueOf(BIG_USER_TABLE); 747 if (get) { 748 try (Table table = sharedConnection.getTable(tableName)) { 749 Stopwatch stopWatch = Stopwatch.createStarted(); 750 for (int i = 0; i < namespaceSpan; i++) { 751 byte[] b = format(rd.nextLong()); 752 Get g = new Get(b); 753 table.get(g); 754 if (i % printInterval == 0) { 755 LOG.info("Get " + printInterval + "/" 756 + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 757 stopWatch.reset(); 758 stopWatch.start(); 759 } 760 } 761 LOG.info("Finished a cycle putting " + namespaceSpan + " in " 762 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms"); 763 } 764 } else { 765 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { 766 Stopwatch stopWatch = Stopwatch.createStarted(); 767 for (int i = 0; i < namespaceSpan; i++) { 768 byte[] b = format(rd.nextLong()); 769 Put p = new Put(b); 770 p.addColumn(HConstants.CATALOG_FAMILY, b, b); 771 mutator.mutate(p); 772 if (i % printInterval == 0) { 773 LOG.info("Put " + printInterval + "/" 774 + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 775 stopWatch.reset(); 776 stopWatch.start(); 777 } 778 } 779 LOG.info("Finished a cycle putting " + namespaceSpan + " in " 780 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms"); 781 } 782 } 783 } 784 785 @Override 786 public int run(String[] arg0) throws Exception { 787 int errCode = 0; 788 // TODO: Make command options. 789 // How many servers to fake. 790 final int servers = 1; 791 // How many regions to put on the faked servers. 792 final int regions = 100000; 793 // How many 'keys' in the faked regions. 794 final long namespaceSpan = 50000000; 795 // How long to take to pause after doing a put; make this long if you want to fake a struggling 796 // server. 797 final long multiPause = 0; 798 // Check args make basic sense. 799 if ((namespaceSpan < regions) || (regions < servers)) { 800 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" 801 + regions + " which must be > servers=" + servers); 802 } 803 804 // Set my many servers and many regions faking connection in place. 805 getConf().set("hbase.client.connection.impl", ManyServersManyRegionsConnection.class.getName()); 806 // Use simple kv registry rather than zk 807 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 808 // When to report fails. Default is we report the 10th. This means we'll see log everytime 809 // an exception is thrown -- usually RegionTooBusyException when we have more than 810 // hbase.test.multi.too.many requests outstanding at any time. 811 getConf().setInt("hbase.client.start.log.errors.counter", 0); 812 813 // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. 814 getConf().setInt("hbase.test.regions", regions); 815 getConf().setLong("hbase.test.namespace.span", namespaceSpan); 816 getConf().setLong("hbase.test.servers", servers); 817 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); 818 getConf().setLong("hbase.test.multi.pause.when.done", multiPause); 819 // Let there be ten outstanding requests at a time before we throw RegionBusyException. 820 getConf().setInt("hbase.test.multi.too.many", 10); 821 final int clients = 2; 822 823 // Share a connection so I can keep counts in the 'server' on concurrency. 824 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()); 825 try { 826 Thread[] ts = new Thread[clients]; 827 for (int j = 0; j < ts.length; j++) { 828 final int id = j; 829 ts[j] = new Thread("" + j) { 830 final Configuration c = getConf(); 831 832 @Override 833 public void run() { 834 try { 835 cycle(id, c, sharedConnection); 836 } catch (IOException e) { 837 LOG.info("Exception in cycle " + id, e); 838 } 839 } 840 }; 841 ts[j].start(); 842 } 843 for (int j = 0; j < ts.length; j++) { 844 ts[j].join(); 845 } 846 } finally { 847 sharedConnection.close(); 848 } 849 return errCode; 850 } 851 852 /** 853 * Run a client instance against a faked up server. 854 * @param args TODO 855 */ 856 public static void main(String[] args) throws Exception { 857 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); 858 } 859}