001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import com.codahale.metrics.Counter;
025import java.io.IOException;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.RegionLocations;
045import org.apache.hadoop.hbase.StartMiniClusterOption;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
050import org.apache.hadoop.hbase.coprocessor.RegionObserver;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.InternalScanner;
053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.zookeeper.KeeperException;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
072
073/**
074 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster.
075 * See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
076 */
077@Category({ LargeTests.class, ClientTests.class })
078public class TestReplicasClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestReplicasClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
085
086  private static TableName TABLE_NAME;
087  private Table table = null;
088  private static final byte[] row = TestReplicasClient.class.getName().getBytes();
089
090  private static RegionInfo hriPrimary;
091  private static RegionInfo hriSecondary;
092
093  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
094  private static final byte[] f = HConstants.CATALOG_FAMILY;
095
096  private final static int REFRESH_PERIOD = 1000;
097
098  /**
099   * This copro is used to synchronize the tests.
100   */
101  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
102    static final AtomicInteger primaryCountOfScan = new AtomicInteger(0);
103    static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0);
104    static final AtomicLong sleepTime = new AtomicLong(0);
105    static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
106    static final AtomicInteger countOfNext = new AtomicInteger(0);
107    private static final AtomicReference<CountDownLatch> primaryCdl =
108      new AtomicReference<>(new CountDownLatch(0));
109    private static final AtomicReference<CountDownLatch> secondaryCdl =
110      new AtomicReference<>(new CountDownLatch(0));
111
112    public SlowMeCopro() {
113    }
114
115    @Override
116    public Optional<RegionObserver> getRegionObserver() {
117      return Optional.of(this);
118    }
119
120    @Override
121    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
122      final List<Cell> results) throws IOException {
123      slowdownCode(e);
124    }
125
126    private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) {
127      LOG.info("==========scan {} ", e.getEnvironment().getRegion().getRegionInfo().getReplicaId(),
128        new Exception());
129      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
130        primaryCountOfScan.incrementAndGet();
131      } else {
132        secondaryCountOfScan.incrementAndGet();
133      }
134    }
135
136    @Override
137    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
138      final Scan scan) throws IOException {
139      incrementScanCount(e);
140      slowdownCode(e);
141    }
142
143    @Override
144    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
145      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
146      throws IOException {
147      incrementScanCount(e);
148      // this will slow down a certain next operation if the conditions are met. The slowness
149      // will allow the call to go to a replica
150      if (slowDownNext.get()) {
151        // have some "next" return successfully from the primary; hence countOfNext checked
152        if (countOfNext.incrementAndGet() == 2) {
153          sleepTime.set(2000);
154          slowdownCode(e);
155        }
156      }
157      return true;
158    }
159
160    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
161      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
162        LOG.info("We're the primary replicas.");
163        CountDownLatch latch = getPrimaryCdl().get();
164        try {
165          if (sleepTime.get() > 0) {
166            LOG.info("Sleeping for " + sleepTime.get() + " ms");
167            Thread.sleep(sleepTime.get());
168          } else if (latch.getCount() > 0) {
169            LOG.info("Waiting for the counterCountDownLatch");
170            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
171            if (latch.getCount() > 0) {
172              throw new RuntimeException("Can't wait more");
173            }
174          }
175        } catch (InterruptedException e1) {
176          LOG.error(e1.toString(), e1);
177        }
178      } else {
179        LOG.info("We're not the primary replicas.");
180        CountDownLatch latch = getSecondaryCdl().get();
181        try {
182          if (latch.getCount() > 0) {
183            LOG.info("Waiting for the secondary counterCountDownLatch");
184            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
185            if (latch.getCount() > 0) {
186              throw new RuntimeException("Can't wait more");
187            }
188          }
189        } catch (InterruptedException e1) {
190          LOG.error(e1.toString(), e1);
191        }
192      }
193    }
194
195    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
196      return primaryCdl;
197    }
198
199    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
200      return secondaryCdl;
201    }
202  }
203
204  @BeforeClass
205  public static void beforeClass() throws Exception {
206    // enable store file refreshing
207    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
208      REFRESH_PERIOD);
209    HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
210    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
211    ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
212    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1)
213      .numAlwaysStandByMasters(1).numMasters(1).build();
214    HTU.startMiniCluster(option);
215
216    // Create table then get the single region for our new table.
217    TABLE_NAME = TableName.valueOf(TestReplicasClient.class.getSimpleName());
218    HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME);
219    hdt.addCoprocessor(SlowMeCopro.class.getName());
220    HTU.createTable(hdt, new byte[][] { f }, null);
221
222    try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
223      hriPrimary = locator.getRegionLocation(row, false).getRegion();
224    }
225
226    // mock a secondary region info to open
227    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
228
229    // No master
230    LOG.info("Master is going to be stopped");
231    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
232    Configuration c = new Configuration(HTU.getConfiguration());
233    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
234    LOG.info("Master has stopped");
235  }
236
237  @AfterClass
238  public static void afterClass() throws Exception {
239    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
240    HTU.shutdownMiniCluster();
241  }
242
243  @Before
244  public void before() throws IOException {
245    try {
246      openRegion(hriPrimary);
247    } catch (Exception ignored) {
248    }
249    try {
250      openRegion(hriSecondary);
251    } catch (Exception ignored) {
252    }
253    SlowMeCopro.slowDownNext.set(false);
254    SlowMeCopro.sleepTime.set(0);
255    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
256    SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0));
257    table = HTU.getConnection().getTable(TABLE_NAME);
258    try (ResultScanner scanner = table.getScanner(new Scan())) {
259      for (;;) {
260        Result result = scanner.next();
261        if (result == null) {
262          break;
263        }
264        table.delete(new Delete(result.getRow()));
265      }
266    }
267    flushRegion(hriPrimary);
268    HTU.getConnection().clearRegionLocationCache();
269    SlowMeCopro.primaryCountOfScan.set(0);
270    SlowMeCopro.secondaryCountOfScan.set(0);
271    SlowMeCopro.countOfNext.set(0);
272  }
273
274  @After
275  public void after() throws IOException, KeeperException {
276    SlowMeCopro.getPrimaryCdl().get().countDown();
277    SlowMeCopro.getSecondaryCdl().get().countDown();
278    try {
279      closeRegion(hriSecondary);
280    } catch (Exception ignored) {
281    }
282    try {
283      closeRegion(hriPrimary);
284    } catch (Exception ignored) {
285    }
286    if (table != null) {
287      table.close();
288    }
289    HTU.getConnection().clearRegionLocationCache();
290  }
291
292  private HRegionServer getRS() {
293    return HTU.getMiniHBaseCluster().getRegionServer(0);
294  }
295
296  private void openRegion(RegionInfo hri) throws Exception {
297    try {
298      if (isRegionOpened(hri)) {
299        return;
300      }
301    } catch (Exception e) {
302    }
303    // first version is '0'
304    AdminProtos.OpenRegionRequest orr =
305      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
306    AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
307    assertEquals(1, responseOpen.getOpeningStateCount());
308    assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
309      responseOpen.getOpeningState(0));
310    checkRegionIsOpened(hri);
311  }
312
313  private void closeRegion(RegionInfo hri) throws Exception {
314    AdminProtos.CloseRegionRequest crr =
315      ProtobufUtil.buildCloseRegionRequest(getRS().getServerName(), hri.getRegionName());
316    AdminProtos.CloseRegionResponse responseClose =
317      getRS().getRSRpcServices().closeRegion(null, crr);
318    assertTrue(responseClose.getClosed());
319
320    checkRegionIsClosed(hri.getEncodedName());
321  }
322
323  private void checkRegionIsOpened(RegionInfo hri) throws Exception {
324    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
325      Thread.sleep(1);
326    }
327  }
328
329  private boolean isRegionOpened(RegionInfo hri) throws Exception {
330    return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
331  }
332
333  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
334
335    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
336      Thread.sleep(1);
337    }
338
339    try {
340      assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
341    } catch (NotServingRegionException expected) {
342      // That's how it work: if the region is closed we have an exception.
343    }
344
345    // We don't delete the znode here, because there is not always a znode.
346  }
347
348  private void flushRegion(RegionInfo regionInfo) throws IOException {
349    TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
350  }
351
352  @Test
353  public void testUseRegionWithoutReplica() throws Exception {
354    byte[] b1 = "testUseRegionWithoutReplica".getBytes();
355    Get g = new Get(b1);
356    Result r = table.get(g);
357    assertFalse(r.isStale());
358  }
359
360  @Test
361  public void testLocations() throws Exception {
362    byte[] b1 = "testLocations".getBytes();
363    ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
364    hc.clearRegionLocationCache();
365    RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
366    assertEquals(2, rl.size());
367
368    rl = hc.locateRegion(table.getName(), b1, true, false);
369    assertEquals(2, rl.size());
370
371    hc.clearRegionLocationCache();
372    rl = hc.locateRegion(table.getName(), b1, true, false);
373    assertEquals(2, rl.size());
374
375    rl = hc.locateRegion(table.getName(), b1, false, false);
376    assertEquals(2, rl.size());
377  }
378
379  @Test
380  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
381    byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
382    // A get works and is not stale
383    Get g = new Get(b1);
384    Result r = table.get(g);
385    assertFalse(r.isStale());
386  }
387
388  @Test
389  public void testGetNoResultStaleRegionWithReplica() throws Exception {
390    byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
391    openRegion(hriSecondary);
392
393    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
394    Get g = new Get(b1);
395    g.setConsistency(Consistency.TIMELINE);
396    Result r = table.get(g);
397    assertTrue(r.isStale());
398  }
399
400  @Test
401  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
402    byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
403    // We sleep; but we won't go to the stale region as we don't get the stale by default.
404    SlowMeCopro.sleepTime.set(2000);
405    Get g = new Get(b1);
406    Result r = table.get(g);
407    assertFalse(r.isStale());
408  }
409
410  @Test
411  public void testFlushTable() throws Exception {
412    flushRegion(hriPrimary);
413    flushRegion(hriSecondary);
414
415    Put p = new Put(row);
416    p.addColumn(f, row, row);
417    table.put(p);
418
419    flushRegion(hriPrimary);
420    flushRegion(hriSecondary);
421  }
422
423  @Test
424  public void testFlushPrimary() throws Exception {
425    flushRegion(hriPrimary);
426
427    Put p = new Put(row);
428    p.addColumn(f, row, row);
429    table.put(p);
430
431    flushRegion(hriPrimary);
432  }
433
434  @Test
435  public void testFlushSecondary() throws Exception {
436    flushRegion(hriSecondary);
437
438    Put p = new Put(row);
439    p.addColumn(f, row, row);
440    table.put(p);
441
442    flushRegion(hriSecondary);
443  }
444
445  @Test
446  public void testUseRegionWithReplica() throws Exception {
447    byte[] b1 = "testUseRegionWithReplica".getBytes();
448    // A simple put works, even if there here a second replica
449    Put p = new Put(b1);
450    p.addColumn(f, b1, b1);
451    table.put(p);
452    LOG.info("Put done");
453
454    // A get works and is not stale
455    Get g = new Get(b1);
456    Result r = table.get(g);
457    assertFalse(r.isStale());
458    assertFalse(r.getColumnCells(f, b1).isEmpty());
459    LOG.info("get works and is not stale done");
460
461    // Even if it we have to wait a little on the main region
462    SlowMeCopro.sleepTime.set(2000);
463    g = new Get(b1);
464    r = table.get(g);
465    assertFalse(r.isStale());
466    assertFalse(r.getColumnCells(f, b1).isEmpty());
467    SlowMeCopro.sleepTime.set(0);
468    LOG.info("sleep and is not stale done");
469
470    // But if we ask for stale we will get it
471    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
472    g = new Get(b1);
473    g.setConsistency(Consistency.TIMELINE);
474    r = table.get(g);
475    assertTrue(r.isStale());
476    assertTrue(r.getColumnCells(f, b1).isEmpty());
477    SlowMeCopro.getPrimaryCdl().get().countDown();
478
479    LOG.info("stale done");
480
481    // exists works and is not stale
482    g = new Get(b1);
483    g.setCheckExistenceOnly(true);
484    r = table.get(g);
485    assertFalse(r.isStale());
486    assertTrue(r.getExists());
487    LOG.info("exists not stale done");
488
489    // exists works on stale but don't see the put
490    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
491    g = new Get(b1);
492    g.setCheckExistenceOnly(true);
493    g.setConsistency(Consistency.TIMELINE);
494    r = table.get(g);
495    assertTrue(r.isStale());
496    assertFalse("The secondary has stale data", r.getExists());
497    SlowMeCopro.getPrimaryCdl().get().countDown();
498    LOG.info("exists stale before flush done");
499
500    flushRegion(hriPrimary);
501    flushRegion(hriSecondary);
502    LOG.info("flush done");
503    Thread.sleep(1000 + REFRESH_PERIOD * 2);
504
505    // get works and is not stale
506    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
507    g = new Get(b1);
508    g.setConsistency(Consistency.TIMELINE);
509    r = table.get(g);
510    assertTrue(r.isStale());
511    assertFalse(r.isEmpty());
512    SlowMeCopro.getPrimaryCdl().get().countDown();
513    LOG.info("stale done");
514
515    // exists works on stale and we see the put after the flush
516    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
517    g = new Get(b1);
518    g.setCheckExistenceOnly(true);
519    g.setConsistency(Consistency.TIMELINE);
520    r = table.get(g);
521    assertTrue(r.isStale());
522    assertTrue(r.getExists());
523    SlowMeCopro.getPrimaryCdl().get().countDown();
524    LOG.info("exists stale after flush done");
525  }
526
527  @Test
528  public void testHedgedRead() throws Exception {
529    byte[] b1 = "testHedgedRead".getBytes();
530    // A simple put works, even if there here a second replica
531    Put p = new Put(b1);
532    p.addColumn(f, b1, b1);
533    table.put(p);
534    LOG.info("Put done");
535
536    // A get works and is not stale
537    Get g = new Get(b1);
538    Result r = table.get(g);
539    assertFalse(r.isStale());
540    assertFalse(r.getColumnCells(f, b1).isEmpty());
541    LOG.info("get works and is not stale done");
542
543    // reset
544    ClusterConnection connection = (ClusterConnection) HTU.getConnection();
545    Counter hedgedReadOps = connection.getConnectionMetrics().getHedgedReadOps();
546    Counter hedgedReadWin = connection.getConnectionMetrics().getHedgedReadWin();
547    hedgedReadOps.dec(hedgedReadOps.getCount());
548    hedgedReadWin.dec(hedgedReadWin.getCount());
549
550    // Wait a little on the main region, just enough to happen once hedged read
551    // and hedged read did not returned faster
552    int primaryCallTimeoutMicroSecond =
553      connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
554    SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
555    SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
556    g = new Get(b1);
557    g.setConsistency(Consistency.TIMELINE);
558    r = table.get(g);
559    assertFalse(r.isStale());
560    assertFalse(r.getColumnCells(f, b1).isEmpty());
561    assertEquals(1, hedgedReadOps.getCount());
562    assertEquals(0, hedgedReadWin.getCount());
563    SlowMeCopro.sleepTime.set(0);
564    SlowMeCopro.getSecondaryCdl().get().countDown();
565    LOG.info("hedged read occurred but not faster");
566
567    // But if we ask for stale we will get it and hedged read returned faster
568    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
569    g = new Get(b1);
570    g.setConsistency(Consistency.TIMELINE);
571    r = table.get(g);
572    assertTrue(r.isStale());
573    assertTrue(r.getColumnCells(f, b1).isEmpty());
574    assertEquals(2, hedgedReadOps.getCount());
575    assertEquals(1, hedgedReadWin.getCount());
576    SlowMeCopro.getPrimaryCdl().get().countDown();
577    LOG.info("hedged read occurred and faster");
578  }
579
580  @Test
581  public void testScanWithReplicas() throws Exception {
582    // simple scan
583    runMultipleScansOfOneType(false, false);
584  }
585
586  @Test
587  public void testSmallScanWithReplicas() throws Exception {
588    // small scan
589    runMultipleScansOfOneType(false, true);
590  }
591
592  @Test
593  public void testReverseScanWithReplicas() throws Exception {
594    // reverse scan
595    runMultipleScansOfOneType(true, false);
596  }
597
598  @Test
599  public void testCancelOfScan() throws Exception {
600    int numRows = 100;
601    for (int i = 0; i < numRows; i++) {
602      byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
603      Put p = new Put(b1);
604      p.addColumn(f, b1, b1);
605      table.put(p);
606    }
607    LOG.debug("PUT done");
608    int caching = 20;
609    byte[] start;
610    start = Bytes.toBytes("testUseRegionWithReplica" + 0);
611
612    flushRegion(hriPrimary);
613    LOG.info("flush done");
614    Thread.sleep(1000 + REFRESH_PERIOD * 2);
615
616    // now make some 'next' calls slow
617    SlowMeCopro.slowDownNext.set(true);
618    SlowMeCopro.countOfNext.set(0);
619    SlowMeCopro.sleepTime.set(5000);
620
621    Scan scan = new Scan().withStartRow(start);
622    scan.setCaching(caching);
623    scan.setConsistency(Consistency.TIMELINE);
624    ResultScanner scanner = table.getScanner(scan);
625    Iterator<Result> iter = scanner.iterator();
626    iter.next();
627    assertTrue(((ClientScanner) scanner).isAnyRPCcancelled());
628    SlowMeCopro.slowDownNext.set(false);
629    SlowMeCopro.countOfNext.set(0);
630  }
631
632  // make sure the scan will only go to the specific replica
633  @Test
634  public void testScanOnSpecificReplica() throws Exception {
635    Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE);
636    try (ResultScanner scanner = table.getScanner(scan)) {
637      scanner.next();
638    }
639    assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
640    assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
641  }
642
643  // make sure the scan will only go to the specific replica
644  @Test
645  public void testReverseScanOnSpecificReplica() throws Exception {
646    Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE);
647    try (ResultScanner scanner = table.getScanner(scan)) {
648      scanner.next();
649    }
650    assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
651    assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
652  }
653
654  private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
655    int numRows = 100;
656    int numCols = 10;
657    for (int i = 0; i < numRows; i++) {
658      byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
659      for (int col = 0; col < numCols; col++) {
660        Put p = new Put(b1);
661        String qualifier = "qualifer" + col;
662        KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
663        p.add(kv);
664        table.put(p);
665      }
666    }
667    LOG.debug("PUT done");
668    int caching = 20;
669    long maxResultSize = Long.MAX_VALUE;
670
671    byte[] start;
672    if (reversed) {
673      start = Bytes.toBytes("testUseRegionWithReplica" + (numRows - 1));
674    } else {
675      start = Bytes.toBytes("testUseRegionWithReplica" + 0);
676    }
677
678    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
679      numCols, false, false);
680
681    // Even if we were to slow the server down, unless we ask for stale
682    // we won't get it
683    SlowMeCopro.sleepTime.set(5000);
684    scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
685      numCols, false, false);
686    SlowMeCopro.sleepTime.set(0);
687
688    flushRegion(hriPrimary);
689    LOG.info("flush done");
690    Thread.sleep(1000 + REFRESH_PERIOD * 2);
691
692    // Now set the flag to get a response even if stale
693    SlowMeCopro.sleepTime.set(5000);
694    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
695      numCols, true, false);
696    SlowMeCopro.sleepTime.set(0);
697
698    // now make some 'next' calls slow
699    SlowMeCopro.slowDownNext.set(true);
700    SlowMeCopro.countOfNext.set(0);
701    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
702      numCols, true, true);
703    SlowMeCopro.slowDownNext.set(false);
704    SlowMeCopro.countOfNext.set(0);
705
706    // Make sure we do not get stale data..
707    SlowMeCopro.sleepTime.set(5000);
708    scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
709      numCols, false, false);
710    SlowMeCopro.sleepTime.set(0);
711
712    // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
713    // returned from the server before the replica switch occurs.
714    maxResultSize = 1;
715    SlowMeCopro.slowDownNext.set(true);
716    SlowMeCopro.countOfNext.set(0);
717    scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
718      numCols, true, true);
719    maxResultSize = Long.MAX_VALUE;
720    SlowMeCopro.slowDownNext.set(false);
721    SlowMeCopro.countOfNext.set(0);
722  }
723
724  private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
725    int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
726    boolean staleExpected, boolean slowNext) throws Exception {
727    Scan scan = new Scan().withStartRow(startRow);
728    scan.setCaching(caching);
729    scan.setMaxResultSize(maxResultSize);
730    scan.setReversed(reversed);
731    scan.setSmall(small);
732    scan.setConsistency(consistency);
733    ResultScanner scanner = table.getScanner(scan);
734    Iterator<Result> iter = scanner.iterator();
735
736    // Maps of row keys that we have seen so far
737    HashMap<String, Boolean> map = new HashMap<>();
738
739    // Tracked metrics
740    int rowCount = 0;
741    int cellCount = 0;
742    int countOfStale = 0;
743
744    while (iter.hasNext()) {
745      rowCount++;
746      Result r = iter.next();
747      String row = new String(r.getRow());
748
749      if (map.containsKey(row)) {
750        throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
751      }
752
753      map.put(row, true);
754      cellCount += r.rawCells().length;
755
756      if (!slowNext) {
757        assertTrue(r.isStale() == staleExpected);
758      }
759      if (r.isStale()) {
760        countOfStale++;
761      }
762    }
763    assertTrue("Count of rows " + rowCount + " num rows expected " + numRows, rowCount == numRows);
764    assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
765      cellCount == (numRows * numCols));
766
767    if (slowNext) {
768      LOG.debug("Count of Stale " + countOfStale);
769      assertTrue(countOfStale > 1);
770
771      // If the scan was configured in such a way that a full row was NOT retrieved before the
772      // replica switch occurred, then it is possible that all rows were stale
773      if (maxResultSize != Long.MAX_VALUE) {
774        assertTrue(countOfStale <= numRows);
775      } else {
776        assertTrue(countOfStale < numRows);
777      }
778    }
779  }
780}