001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.net.SocketTimeoutException;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Objects;
029import java.util.Random;
030import java.util.SortedMap;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.commons.lang3.NotImplementedException;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.hbase.CellComparatorImpl;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HRegionInfo;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.MetaCellComparator;
049import org.apache.hadoop.hbase.MetaTableAccessor;
050import org.apache.hadoop.hbase.RegionLocations;
051import org.apache.hadoop.hbase.RegionTooBusyException;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.SmallTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.util.Threads;
062import org.apache.hadoop.util.Tool;
063import org.apache.hadoop.util.ToolRunner;
064import org.junit.Before;
065import org.junit.ClassRule;
066import org.junit.Ignore;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.Mockito;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
077import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
078
079import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
103
104/**
105 * Test client behavior w/o setting up a cluster. Mock up cluster emissions. See below for a method
106 * that tests retries/timeouts currently commented out.
107 */
108@Category({ ClientTests.class, SmallTests.class })
109public class TestClientNoCluster extends Configured implements Tool {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113    HBaseClassTestRule.forClass(TestClientNoCluster.class);
114
115  private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class);
116  private Configuration conf;
117  /**
118   * A server that does not exist. I've changed the server in the below to 'localhost' so we have a
119   * servername that resolves -- otherwise, we just fail on server name lookup with UnknownHost...
120   * With localhost, was able to reproduce stack traces that looked like production stack traces.
121   * Was useful figuring out how retry/timeouts are functioning.
122   */
123  public static final ServerName META_SERVERNAME =
124    ServerName.valueOf("meta.example.org", 16010, 12345);
125
126  @Before
127  public void setUp() throws Exception {
128    this.conf = HBaseConfiguration.create();
129    // Run my Connection overrides. Use my little ConnectionImplementation below which
130    // allows me insert mocks and also use my Registry below rather than the default zk based
131    // one so tests run faster and don't have zk dependency.
132    this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
133  }
134
135  /**
136   * Simple cluster registry inserted in place of our usual zookeeper based one.
137   */
138  static class SimpleRegistry extends DoNothingConnectionRegistry {
139    final ServerName META_HOST = META_SERVERNAME;
140
141    public SimpleRegistry(Configuration conf, User user) {
142      super(conf, user);
143    }
144
145    @Override
146    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
147      return CompletableFuture.completedFuture(new RegionLocations(
148        new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST)));
149    }
150
151    @Override
152    public CompletableFuture<String> getClusterId() {
153      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
154    }
155  }
156
157  /**
158   * Remove the @Ignore to try out timeout and retry settings
159   */
160  @Ignore
161  @Test
162  public void testTimeoutAndRetries() throws IOException {
163    Configuration localConfig = HBaseConfiguration.create(this.conf);
164    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
165    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
166    // localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 7);
167    Connection connection = ConnectionFactory.createConnection(localConfig);
168    Table table = connection.getTable(TableName.META_TABLE_NAME);
169    Throwable t = null;
170    LOG.info("Start");
171    try {
172      // An exists call turns into a get w/ a flag.
173      table.exists(new Get(Bytes.toBytes("abc")));
174    } catch (SocketTimeoutException e) {
175      // I expect this exception.
176      LOG.info("Got expected exception", e);
177      t = e;
178    } finally {
179      table.close();
180    }
181    connection.close();
182    LOG.info("Stop");
183    assertTrue(t != null);
184  }
185
186  /**
187   * Remove the @Ignore to try out timeout and retry settings
188   */
189  // @Ignore
190  @Test
191  public void testAsyncTimeoutAndRetries()
192    throws IOException, ExecutionException, InterruptedException {
193    Configuration localConfig = HBaseConfiguration.create(this.conf);
194    localConfig.set(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
195      RpcTimeoutAsyncConnection.class.getName());
196    localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 9);
197    AsyncConnection connection = ConnectionFactory.createAsyncConnection(localConfig).get();
198    AsyncTable table = connection.getTable(TableName.META_TABLE_NAME);
199    Throwable t = null;
200    LOG.info("Start");
201    try {
202      // An exists call turns into a get w/ a flag.
203      table.exists(new Get(Bytes.toBytes("abc"))).get();
204    } catch (Throwable throwable) {
205      // What to catch?
206      t = throwable;
207    } finally {
208      connection.close();
209    }
210    LOG.info("Stop");
211    assertTrue(t != null);
212  }
213
214  /**
215   * Test that operation timeout prevails over rpc default timeout and retries, etc.
216   */
217  @Test
218  public void testRpcTimeout() throws IOException {
219    Configuration localConfig = HBaseConfiguration.create(this.conf);
220    // This override mocks up our exists/get call to throw a RegionServerStoppedException.
221    localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
222    int pause = 10;
223    localConfig.setInt("hbase.client.pause", pause);
224    localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
225    // Set the operation timeout to be < the pause. Expectation is that after first pause, we will
226    // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
227    // and it has expired. Otherwise, if this functionality is broke, all retries will be run --
228    // all ten of them -- and we'll get the RetriesExhaustedException exception.
229    localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
230    Connection connection = ConnectionFactory.createConnection(localConfig);
231    Table table = connection.getTable(TableName.META_TABLE_NAME);
232    Throwable t = null;
233    try {
234      // An exists call turns into a get w/ a flag.
235      table.exists(new Get(Bytes.toBytes("abc")));
236    } catch (SocketTimeoutException e) {
237      // I expect this exception.
238      LOG.info("Got expected exception", e);
239      t = e;
240    } finally {
241      table.close();
242      connection.close();
243    }
244    assertTrue(t != null);
245  }
246
247  @Test
248  public void testDoNotRetryMetaTableAccessor() throws IOException {
249    this.conf.set("hbase.client.connection.impl",
250      RegionServerStoppedOnScannerOpenConnection.class.getName());
251    try (Connection connection = ConnectionFactory.createConnection(conf)) {
252      MetaTableAccessor.fullScanRegions(connection);
253    }
254  }
255
256  @Test
257  public void testDoNotRetryOnScanNext() throws IOException {
258    this.conf.set("hbase.client.connection.impl",
259      RegionServerStoppedOnScannerOpenConnection.class.getName());
260    // Go against meta else we will try to find first region for the table on construction which
261    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
262    // good for a bit of testing.
263    Connection connection = ConnectionFactory.createConnection(this.conf);
264    Table table = connection.getTable(TableName.META_TABLE_NAME);
265    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
266    try {
267      Result result = null;
268      while ((result = scanner.next()) != null) {
269        LOG.info(Objects.toString(result));
270      }
271    } finally {
272      scanner.close();
273      table.close();
274      connection.close();
275    }
276  }
277
278  @Test
279  public void testRegionServerStoppedOnScannerOpen() throws IOException {
280    this.conf.set("hbase.client.connection.impl",
281      RegionServerStoppedOnScannerOpenConnection.class.getName());
282    // Go against meta else we will try to find first region for the table on construction which
283    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
284    // good for a bit of testing.
285    Connection connection = ConnectionFactory.createConnection(conf);
286    Table table = connection.getTable(TableName.META_TABLE_NAME);
287    ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
288    try {
289      Result result = null;
290      while ((result = scanner.next()) != null) {
291        LOG.info(Objects.toString(result));
292      }
293    } finally {
294      scanner.close();
295      table.close();
296      connection.close();
297    }
298  }
299
300  @Test
301  public void testConnectionClosedOnRegionLocate() throws IOException {
302    Configuration testConf = new Configuration(this.conf);
303    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
304    // Go against meta else we will try to find first region for the table on construction which
305    // means we'll have to do a bunch more mocking. Tests that go against meta only should be
306    // good for a bit of testing.
307    Connection connection = ConnectionFactory.createConnection(testConf);
308    Table table = connection.getTable(TableName.META_TABLE_NAME);
309    connection.close();
310    try {
311      Get get = new Get(Bytes.toBytes("dummyRow"));
312      table.get(get);
313      fail("Should have thrown DoNotRetryException but no exception thrown");
314    } catch (Exception e) {
315      if (!(e instanceof DoNotRetryIOException)) {
316        String errMsg =
317          "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
318        LOG.error(errMsg, e);
319        fail(errMsg);
320      }
321    } finally {
322      table.close();
323    }
324  }
325
326  /**
327   * Override to shutdown going to zookeeper for cluster id and meta location.
328   */
329  static class RegionServerStoppedOnScannerOpenConnection extends ConnectionImplementation {
330    final ClientService.BlockingInterface stub;
331
332    RegionServerStoppedOnScannerOpenConnection(Configuration conf, ExecutorService pool, User user,
333      Map<String, byte[]> requestAttributes) throws IOException {
334      super(conf, pool, user, requestAttributes);
335      // Mock up my stub so open scanner returns a scanner id and then on next, we throw
336      // exceptions for three times and then after that, we return no more to scan.
337      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
338      long sid = 12345L;
339      try {
340        Mockito
341          .when(stub.scan((RpcController) Mockito.any(), (ClientProtos.ScanRequest) Mockito.any()))
342          .thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build())
343          .thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")))
344          .thenReturn(
345            ClientProtos.ScanResponse.newBuilder().setScannerId(sid).setMoreResults(false).build());
346      } catch (ServiceException e) {
347        throw new IOException(e);
348      }
349    }
350
351    @Override
352    public BlockingInterface getClient(ServerName sn) throws IOException {
353      return this.stub;
354    }
355  }
356
357  /**
358   * Override to check we are setting rpc timeout right.
359   */
360  static class RpcTimeoutConnection extends ConnectionImplementation {
361    final ClientService.BlockingInterface stub;
362
363    RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user,
364      Map<String, byte[]> requestAttributes) throws IOException {
365      super(conf, pool, user, requestAttributes);
366      // Mock up my stub so an exists call -- which turns into a get -- throws an exception
367      this.stub = Mockito.mock(ClientService.BlockingInterface.class);
368      try {
369        Mockito
370          .when(stub.get((RpcController) Mockito.any(), (ClientProtos.GetRequest) Mockito.any()))
371          .thenThrow(new ServiceException(new java.net.ConnectException("Connection refused")));
372      } catch (ServiceException e) {
373        throw new IOException(e);
374      }
375    }
376
377    @Override
378    public BlockingInterface getClient(ServerName sn) throws IOException {
379      return this.stub;
380    }
381  }
382
383  /**
384   * Override to check we are setting rpc timeout right.
385   */
386  static class RpcTimeoutAsyncConnection extends AsyncConnectionImpl {
387    RpcTimeoutAsyncConnection(Configuration configuration, ConnectionRegistry registry,
388      String clusterId, User user, Map<String, byte[]> connectionAttributes) {
389      super(configuration, registry, clusterId, user, connectionAttributes);
390    }
391  }
392
393  /**
394   * Fake many regionservers and many regions on a connection implementation.
395   */
396  static class ManyServersManyRegionsConnection extends ConnectionImplementation {
397    // All access should be synchronized
398    final Map<ServerName, ClientService.BlockingInterface> serversByClient;
399
400    /**
401     * Map of faked-up rows of a 'meta table'.
402     */
403    final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta;
404    final AtomicLong sequenceids = new AtomicLong(0);
405    private final Configuration conf;
406
407    ManyServersManyRegionsConnection(Configuration conf, ExecutorService pool, User user,
408      Map<String, byte[]> requestAttributes) throws IOException {
409      super(conf, pool, user, requestAttributes);
410      int serverCount = conf.getInt("hbase.test.servers", 10);
411      this.serversByClient = new HashMap<>(serverCount);
412      this.meta =
413        makeMeta(Bytes.toBytes(conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
414          conf.getInt("hbase.test.regions", 100), conf.getLong("hbase.test.namespace.span", 1000),
415          serverCount);
416      this.conf = conf;
417    }
418
419    @Override
420    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
421      // if (!sn.toString().startsWith("meta")) LOG.info(sn);
422      ClientService.BlockingInterface stub = null;
423      synchronized (this.serversByClient) {
424        stub = this.serversByClient.get(sn);
425        if (stub == null) {
426          stub = new FakeServer(this.conf, meta, sequenceids);
427          this.serversByClient.put(sn, stub);
428        }
429      }
430      return stub;
431    }
432  }
433
434  static MultiResponse doMultiResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta,
435    final AtomicLong sequenceids, final MultiRequest request) {
436    // Make a response to match the request. Act like there were no failures.
437    ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
438    // Per Region.
439    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
440    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
441    for (RegionAction regionAction : request.getRegionActionList()) {
442      regionActionResultBuilder.clear();
443      // Per Action in a Region.
444      for (ClientProtos.Action action : regionAction.getActionList()) {
445        roeBuilder.clear();
446        // Return empty Result and proper index as result.
447        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
448        roeBuilder.setIndex(action.getIndex());
449        regionActionResultBuilder.addResultOrException(roeBuilder.build());
450      }
451      builder.addRegionActionResult(regionActionResultBuilder.build());
452    }
453    return builder.build();
454  }
455
456  /**
457   * Fake 'server'. Implements the ClientService responding as though it were a 'server' (presumes a
458   * new ClientService.BlockingInterface made per server).
459   */
460  static class FakeServer implements ClientService.BlockingInterface {
461    private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
462    private final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta;
463    private final AtomicLong sequenceids;
464    private final long multiPause;
465    private final int tooManyMultiRequests;
466
467    FakeServer(final Configuration c, final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta,
468      final AtomicLong sequenceids) {
469      this.meta = meta;
470      this.sequenceids = sequenceids;
471
472      // Pause to simulate the server taking time applying the edits. This will drive up the
473      // number of threads used over in client.
474      this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
475      this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
476    }
477
478    @Override
479    public GetResponse get(RpcController controller, GetRequest request) throws ServiceException {
480      boolean metaRegion =
481        isMetaRegion(request.getRegion().getValue().toByteArray(), request.getRegion().getType());
482      if (!metaRegion) {
483        return doGetResponse(request);
484      }
485      return doMetaGetResponse(meta, request);
486    }
487
488    private GetResponse doGetResponse(GetRequest request) {
489      ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
490      ByteString row = request.getGet().getRow();
491      resultBuilder.addCell(getStartCode(row));
492      GetResponse.Builder builder = GetResponse.newBuilder();
493      builder.setResult(resultBuilder.build());
494      return builder.build();
495    }
496
497    @Override
498    public MutateResponse mutate(RpcController controller, MutateRequest request)
499      throws ServiceException {
500      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
501    }
502
503    @Override
504    public ScanResponse scan(RpcController controller, ScanRequest request)
505      throws ServiceException {
506      // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
507      // the server to keep reference by scannerid. TODO.
508      return doMetaScanResponse(meta, sequenceids, request);
509    }
510
511    @Override
512    public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
513      BulkLoadHFileRequest request) throws ServiceException {
514      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
515    }
516
517    @Override
518    public CoprocessorServiceResponse execService(RpcController controller,
519      CoprocessorServiceRequest request) throws ServiceException {
520      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
521    }
522
523    @Override
524    public MultiResponse multi(RpcController controller, MultiRequest request)
525      throws ServiceException {
526      int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
527      try {
528        if (concurrentInvocations >= tooManyMultiRequests) {
529          throw new ServiceException(
530            new RegionTooBusyException("concurrentInvocations=" + concurrentInvocations));
531        }
532        Threads.sleep(multiPause);
533        return doMultiResponse(meta, sequenceids, request);
534      } finally {
535        this.multiInvocationsCount.decrementAndGet();
536      }
537    }
538
539    @Override
540    public CoprocessorServiceResponse execRegionServerService(RpcController controller,
541      CoprocessorServiceRequest request) throws ServiceException {
542      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
543    }
544
545    @Override
546    public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
547      PrepareBulkLoadRequest request) throws ServiceException {
548      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
549    }
550
551    @Override
552    public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
553      CleanupBulkLoadRequest request) throws ServiceException {
554      throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
555    }
556  }
557
558  static ScanResponse doMetaScanResponse(
559    final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, final AtomicLong sequenceids,
560    final ScanRequest request) {
561    ScanResponse.Builder builder = ScanResponse.newBuilder();
562    int max = request.getNumberOfRows();
563    int count = 0;
564    Map<byte[], Pair<HRegionInfo, ServerName>> tail =
565      request.hasScan() ? meta.tailMap(request.getScan().getStartRow().toByteArray()) : meta;
566    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
567    for (Map.Entry<byte[], Pair<HRegionInfo, ServerName>> e : tail.entrySet()) {
568      // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
569      if (max <= 0) break;
570      if (++count > max) break;
571      HRegionInfo hri = e.getValue().getFirst();
572      ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName());
573      resultBuilder.clear();
574      resultBuilder.addCell(getRegionInfo(row, hri));
575      resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
576      resultBuilder.addCell(getStartCode(row));
577      builder.addResults(resultBuilder.build());
578      // Set more to false if we are on the last region in table.
579      if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
580      else builder.setMoreResults(true);
581    }
582    // If no scannerid, set one.
583    builder.setScannerId(
584      request.hasScannerId() ? request.getScannerId() : sequenceids.incrementAndGet());
585    return builder.build();
586  }
587
588  static GetResponse doMetaGetResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta,
589    final GetRequest request) {
590    ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
591    ByteString row = request.getGet().getRow();
592    Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
593    if (p != null) {
594      resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
595      resultBuilder.addCell(getServer(row, p.getSecond()));
596    }
597    resultBuilder.addCell(getStartCode(row));
598    GetResponse.Builder builder = GetResponse.newBuilder();
599    builder.setResult(resultBuilder.build());
600    return builder.build();
601  }
602
603  /**
604   * @param name region name or encoded region name.
605   * @return True if we are dealing with a hbase:meta region.
606   */
607  static boolean isMetaRegion(final byte[] name, final RegionSpecifierType type) {
608    switch (type) {
609      case REGION_NAME:
610        return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
611      case ENCODED_REGION_NAME:
612        return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
613      default:
614        throw new UnsupportedOperationException();
615    }
616  }
617
618  private final static ByteString CATALOG_FAMILY_BYTESTRING =
619    UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY);
620  private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
621    UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER);
622  private final static ByteString SERVER_QUALIFIER_BYTESTRING =
623    UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER);
624
625  static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
626    CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
627    cellBuilder.setRow(row);
628    cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
629    cellBuilder.setTimestamp(EnvironmentEdgeManager.currentTime());
630    return cellBuilder;
631  }
632
633  static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
634    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
635    cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
636    cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray()));
637    return cellBuilder.build();
638  }
639
640  static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
641    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
642    cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
643    cellBuilder.setValue(ByteString.copyFromUtf8(sn.getAddress().toString()));
644    return cellBuilder.build();
645  }
646
647  static CellProtos.Cell getStartCode(final ByteString row) {
648    CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
649    cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER));
650    // TODO:
651    cellBuilder
652      .setValue(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
653    return cellBuilder.build();
654  }
655
656  private static final byte[] BIG_USER_TABLE = Bytes.toBytes("t");
657
658  /**
659   * Format passed integer. Zero-pad. Copied from hbase-server PE class and small amendment. Make
660   * them share.
661   * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in
662   *         case number is negative).
663   */
664  private static byte[] format(final long number) {
665    byte[] b = new byte[10];
666    long d = number;
667    for (int i = b.length - 1; i >= 0; i--) {
668      b[i] = (byte) ((d % 10) + '0');
669      d /= 10;
670    }
671    return b;
672  }
673
674  /** Returns <code>count</code> regions */
675  private static HRegionInfo[] makeHRegionInfos(final byte[] tableName, final int count,
676    final long namespaceSpan) {
677    byte[] startKey = HConstants.EMPTY_BYTE_ARRAY;
678    byte[] endKey = HConstants.EMPTY_BYTE_ARRAY;
679    long interval = namespaceSpan / count;
680    HRegionInfo[] hris = new HRegionInfo[count];
681    for (int i = 0; i < count; i++) {
682      if (i == 0) {
683        endKey = format(interval);
684      } else {
685        startKey = endKey;
686        if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
687        else endKey = format((i + 1) * interval);
688      }
689      hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
690    }
691    return hris;
692  }
693
694  /** Returns Return <code>count</code> servernames. */
695  private static ServerName[] makeServerNames(final int count) {
696    ServerName[] sns = new ServerName[count];
697    for (int i = 0; i < count; i++) {
698      sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
699    }
700    return sns;
701  }
702
703  /**
704   * Comparator for meta row keys.
705   */
706  private static class MetaRowsComparator implements Comparator<byte[]> {
707    private final CellComparatorImpl delegate = MetaCellComparator.META_COMPARATOR;
708
709    @Override
710    public int compare(byte[] left, byte[] right) {
711      return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length);
712    }
713  }
714
715  /**
716   * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
717   * ServerName to return for this row.
718   * @return Map with faked hbase:meta content in it.
719   */
720  static SortedMap<byte[], Pair<HRegionInfo, ServerName>> makeMeta(final byte[] tableName,
721    final int regionCount, final long namespaceSpan, final int serverCount) {
722    // I need a comparator for meta rows so we sort properly.
723    SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta =
724      new ConcurrentSkipListMap<>(new MetaRowsComparator());
725    HRegionInfo[] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
726    ServerName[] serverNames = makeServerNames(serverCount);
727    int per = regionCount / serverCount;
728    int count = 0;
729    for (HRegionInfo hri : hris) {
730      Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]);
731      meta.put(hri.getRegionName(), p);
732    }
733    return meta;
734  }
735
736  /**
737   * Code for each 'client' to run.
738   */
739  static void cycle(int id, final Configuration c, final Connection sharedConnection)
740    throws IOException {
741    long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
742    long startTime = EnvironmentEdgeManager.currentTime();
743    final int printInterval = 100000;
744    Random rd = new Random(id);
745    boolean get = c.getBoolean("hbase.test.do.gets", false);
746    TableName tableName = TableName.valueOf(BIG_USER_TABLE);
747    if (get) {
748      try (Table table = sharedConnection.getTable(tableName)) {
749        Stopwatch stopWatch = Stopwatch.createStarted();
750        for (int i = 0; i < namespaceSpan; i++) {
751          byte[] b = format(rd.nextLong());
752          Get g = new Get(b);
753          table.get(g);
754          if (i % printInterval == 0) {
755            LOG.info("Get " + printInterval + "/"
756              + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
757            stopWatch.reset();
758            stopWatch.start();
759          }
760        }
761        LOG.info("Finished a cycle putting " + namespaceSpan + " in "
762          + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
763      }
764    } else {
765      try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
766        Stopwatch stopWatch = Stopwatch.createStarted();
767        for (int i = 0; i < namespaceSpan; i++) {
768          byte[] b = format(rd.nextLong());
769          Put p = new Put(b);
770          p.addColumn(HConstants.CATALOG_FAMILY, b, b);
771          mutator.mutate(p);
772          if (i % printInterval == 0) {
773            LOG.info("Put " + printInterval + "/"
774              + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS));
775            stopWatch.reset();
776            stopWatch.start();
777          }
778        }
779        LOG.info("Finished a cycle putting " + namespaceSpan + " in "
780          + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
781      }
782    }
783  }
784
785  @Override
786  public int run(String[] arg0) throws Exception {
787    int errCode = 0;
788    // TODO: Make command options.
789    // How many servers to fake.
790    final int servers = 1;
791    // How many regions to put on the faked servers.
792    final int regions = 100000;
793    // How many 'keys' in the faked regions.
794    final long namespaceSpan = 50000000;
795    // How long to take to pause after doing a put; make this long if you want to fake a struggling
796    // server.
797    final long multiPause = 0;
798    // Check args make basic sense.
799    if ((namespaceSpan < regions) || (regions < servers)) {
800      throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions="
801        + regions + " which must be > servers=" + servers);
802    }
803
804    // Set my many servers and many regions faking connection in place.
805    getConf().set("hbase.client.connection.impl", ManyServersManyRegionsConnection.class.getName());
806    // Use simple kv registry rather than zk
807    getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
808    // When to report fails. Default is we report the 10th. This means we'll see log everytime
809    // an exception is thrown -- usually RegionTooBusyException when we have more than
810    // hbase.test.multi.too.many requests outstanding at any time.
811    getConf().setInt("hbase.client.start.log.errors.counter", 0);
812
813    // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
814    getConf().setInt("hbase.test.regions", regions);
815    getConf().setLong("hbase.test.namespace.span", namespaceSpan);
816    getConf().setLong("hbase.test.servers", servers);
817    getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
818    getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
819    // Let there be ten outstanding requests at a time before we throw RegionBusyException.
820    getConf().setInt("hbase.test.multi.too.many", 10);
821    final int clients = 2;
822
823    // Share a connection so I can keep counts in the 'server' on concurrency.
824    final Connection sharedConnection = ConnectionFactory.createConnection(getConf());
825    try {
826      Thread[] ts = new Thread[clients];
827      for (int j = 0; j < ts.length; j++) {
828        final int id = j;
829        ts[j] = new Thread("" + j) {
830          final Configuration c = getConf();
831
832          @Override
833          public void run() {
834            try {
835              cycle(id, c, sharedConnection);
836            } catch (IOException e) {
837              LOG.info("Exception in cycle " + id, e);
838            }
839          }
840        };
841        ts[j].start();
842      }
843      for (int j = 0; j < ts.length; j++) {
844        ts[j].join();
845      }
846    } finally {
847      sharedConnection.close();
848    }
849    return errCode;
850  }
851
852  /**
853   * Run a client instance against a faked up server.
854   * @param args TODO
855   */
856  public static void main(String[] args) throws Exception {
857    System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
858  }
859}