001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.TreeMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
046import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
047import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
054import org.junit.AfterClass;
055import org.junit.Assert;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({ LargeTests.class, ClientTests.class })
064public class TestReplicaWithCluster {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
071
072  private static final int NB_SERVERS = 3;
073  private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName());
074  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
075
076  // second minicluster used in testing of replication
077  private static HBaseTestingUtil HTU2;
078  private static final byte[] f = HConstants.CATALOG_FAMILY;
079
080  private final static int REFRESH_PERIOD = 1000;
081  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
082
083  /**
084   * This copro is used to synchronize the tests.
085   */
086  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
087    static final AtomicLong sleepTime = new AtomicLong(0);
088    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
089
090    public SlowMeCopro() {
091    }
092
093    @Override
094    public Optional<RegionObserver> getRegionObserver() {
095      return Optional.of(this);
096    }
097
098    @Override
099    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
100      final Get get, final List<Cell> results) throws IOException {
101
102      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
103        CountDownLatch latch = cdl.get();
104        try {
105          if (sleepTime.get() > 0) {
106            LOG.info("Sleeping for " + sleepTime.get() + " ms");
107            Thread.sleep(sleepTime.get());
108          } else if (latch.getCount() > 0) {
109            LOG.info("Waiting for the counterCountDownLatch");
110            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
111            if (latch.getCount() > 0) {
112              throw new RuntimeException("Can't wait more");
113            }
114          }
115        } catch (InterruptedException e1) {
116          LOG.error(e1.toString(), e1);
117        }
118      } else {
119        LOG.info("We're not the primary replicas.");
120      }
121    }
122  }
123
124  /**
125   * This copro is used to simulate region server down exception for Get and Scan
126   */
127  @CoreCoprocessor
128  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
129
130    public RegionServerStoppedCopro() {
131    }
132
133    @Override
134    public Optional<RegionObserver> getRegionObserver() {
135      return Optional.of(this);
136    }
137
138    @Override
139    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
140      final Get get, final List<Cell> results) throws IOException {
141
142      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
143
144      // Fail for the primary replica and replica 1
145      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
146        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
147        throw new RegionServerStoppedException(
148          "Server " + e.getEnvironment().getServerName() + " not running");
149      } else {
150        LOG.info("We're replica region " + replicaId);
151      }
152    }
153
154    @Override
155    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
156      final Scan scan) throws IOException {
157      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
158      // Fail for the primary replica and replica 1
159      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
160        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
161        throw new RegionServerStoppedException(
162          "Server " + e.getEnvironment().getServerName() + " not running");
163      } else {
164        LOG.info("We're replica region " + replicaId);
165      }
166    }
167  }
168
169  /**
170   * This copro is used to slow down the primary meta region scan a bit
171   */
172  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
173    implements RegionCoprocessor, RegionObserver {
174    static boolean slowDownPrimaryMetaScan = false;
175    static boolean throwException = false;
176
177    @Override
178    public Optional<RegionObserver> getRegionObserver() {
179      return Optional.of(this);
180    }
181
182    @Override
183    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
184      final Get get, final List<Cell> results) throws IOException {
185
186      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
187
188      // Fail for the primary replica, but not for meta
189      if (throwException) {
190        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
191          LOG.info("Get, throw Region Server Stopped Exceptoin for region "
192            + e.getEnvironment().getRegion().getRegionInfo());
193          throw new RegionServerStoppedException(
194            "Server " + e.getEnvironment().getServerName() + " not running");
195        }
196      } else {
197        LOG.info("Get, We're replica region " + replicaId);
198      }
199    }
200
201    @Override
202    public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
203      final Scan scan) throws IOException {
204
205      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
206
207      // Slow down with the primary meta region scan
208      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
209        if (slowDownPrimaryMetaScan) {
210          LOG.info("Scan with primary meta region, slow down a bit");
211          try {
212            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
213          } catch (InterruptedException ie) {
214            // Ingore
215          }
216        }
217
218        // Fail for the primary replica
219        if (throwException) {
220          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica "
221            + e.getEnvironment().getRegion().getRegionInfo());
222
223          throw new RegionServerStoppedException(
224            "Server " + e.getEnvironment().getServerName() + " not running");
225        } else {
226          LOG.info("Scan, We're replica region " + replicaId);
227        }
228      } else {
229        LOG.info("Scan, We're replica region " + replicaId);
230      }
231    }
232  }
233
234  @BeforeClass
235  public static void beforeClass() throws Exception {
236    // enable store file refreshing
237    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
238      REFRESH_PERIOD);
239
240    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
241    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
242    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
243    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
244    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
245    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
246    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
247
248    // Wait for primary call longer so make sure that it will get exception from the primary call
249    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
250    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
251
252    // Make sure master does not host system tables.
253    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
254
255    // Set system coprocessor so it can be applied to meta regions
256    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
257      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
258
259    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
260      META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
261
262    HTU.startMiniCluster(NB_SERVERS);
263    // Enable meta replica at server side
264    HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
265
266    HTU.getHBaseCluster().startMaster();
267  }
268
269  @AfterClass
270  public static void afterClass() throws Exception {
271    if (HTU2 != null) HTU2.shutdownMiniCluster();
272    HTU.shutdownMiniCluster();
273  }
274
275  @Test
276  public void testCreateDeleteTable() throws IOException {
277    // Create table then get the single region for our new table.
278    TableDescriptorBuilder builder =
279      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
280        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
281        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
282    builder.setRegionReplication(NB_SERVERS);
283    builder.setCoprocessor(SlowMeCopro.class.getName());
284    TableDescriptor hdt = builder.build();
285    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
286
287    Put p = new Put(row);
288    p.addColumn(f, row, row);
289    table.put(p);
290
291    Get g = new Get(row);
292    Result r = table.get(g);
293    Assert.assertFalse(r.isStale());
294
295    try {
296      // But if we ask for stale we will get it
297      SlowMeCopro.cdl.set(new CountDownLatch(1));
298      g = new Get(row);
299      g.setConsistency(Consistency.TIMELINE);
300      r = table.get(g);
301      Assert.assertTrue(r.isStale());
302      SlowMeCopro.cdl.get().countDown();
303    } finally {
304      SlowMeCopro.cdl.get().countDown();
305      SlowMeCopro.sleepTime.set(0);
306    }
307
308    HTU.getAdmin().disableTable(hdt.getTableName());
309    HTU.deleteTable(hdt.getTableName());
310  }
311
312  @Test
313  public void testChangeTable() throws Exception {
314    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
315      .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName())
316      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
317    HTU.getAdmin().createTable(td);
318    Table table = HTU.getConnection().getTable(td.getTableName());
319    // basic test: it should work.
320    Put p = new Put(row);
321    p.addColumn(f, row, row);
322    table.put(p);
323
324    Get g = new Get(row);
325    Result r = table.get(g);
326    Assert.assertFalse(r.isStale());
327
328    // Add a CF, it should work.
329    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
330    td = TableDescriptorBuilder.newBuilder(td)
331      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build();
332    HTU.getAdmin().disableTable(td.getTableName());
333    HTU.getAdmin().modifyTable(td);
334    HTU.getAdmin().enableTable(td.getTableName());
335    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
336    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
337      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
338
339    p = new Put(row);
340    p.addColumn(row, row, row);
341    table.put(p);
342
343    g = new Get(row);
344    r = table.get(g);
345    Assert.assertFalse(r.isStale());
346
347    try {
348      SlowMeCopro.cdl.set(new CountDownLatch(1));
349      g = new Get(row);
350      g.setConsistency(Consistency.TIMELINE);
351      r = table.get(g);
352      Assert.assertTrue(r.isStale());
353    } finally {
354      SlowMeCopro.cdl.get().countDown();
355      SlowMeCopro.sleepTime.set(0);
356    }
357
358    Admin admin = HTU.getAdmin();
359    nHdt = admin.getDescriptor(td.getTableName());
360    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
361      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
362
363    admin.disableTable(td.getTableName());
364    admin.deleteTable(td.getTableName());
365    admin.close();
366  }
367
368  @SuppressWarnings("deprecation")
369  @Test
370  public void testReplicaAndReplication() throws Exception {
371    TableDescriptorBuilder builder =
372      HTU.createModifyableTableDescriptor("testReplicaAndReplication");
373    builder.setRegionReplication(NB_SERVERS);
374    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row)
375      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
376
377    builder.setCoprocessor(SlowMeCopro.class.getName());
378    TableDescriptor tableDescriptor = builder.build();
379    HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
380
381    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
382    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
383    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
384    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
385
386    HTU2 = new HBaseTestingUtil(conf2);
387    HTU2.setZkCluster(miniZK);
388    HTU2.startMiniCluster(NB_SERVERS);
389    LOG.info("Setup second Zk");
390    HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
391
392    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
393      Admin admin = connection.getAdmin()) {
394      ReplicationPeerConfig rpc =
395        ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getRpcConnnectionURI()).build();
396      admin.addReplicationPeer("2", rpc);
397    }
398
399    Put p = new Put(row);
400    p.addColumn(row, row, row);
401    final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName());
402    table.put(p);
403
404    HTU.getAdmin().flush(table.getName());
405    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
406
407    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
408      @Override
409      public boolean evaluate() throws Exception {
410        try {
411          SlowMeCopro.cdl.set(new CountDownLatch(1));
412          Get g = new Get(row);
413          g.setConsistency(Consistency.TIMELINE);
414          Result r = table.get(g);
415          Assert.assertTrue(r.isStale());
416          return !r.isEmpty();
417        } finally {
418          SlowMeCopro.cdl.get().countDown();
419          SlowMeCopro.sleepTime.set(0);
420        }
421      }
422    });
423    table.close();
424    LOG.info("stale get on the first cluster done. Now for the second.");
425
426    final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName());
427    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
428      @Override
429      public boolean evaluate() throws Exception {
430        try {
431          SlowMeCopro.cdl.set(new CountDownLatch(1));
432          Get g = new Get(row);
433          g.setConsistency(Consistency.TIMELINE);
434          Result r = table2.get(g);
435          Assert.assertTrue(r.isStale());
436          return !r.isEmpty();
437        } finally {
438          SlowMeCopro.cdl.get().countDown();
439          SlowMeCopro.sleepTime.set(0);
440        }
441      }
442    });
443    table2.close();
444
445    HTU.getAdmin().disableTable(tableDescriptor.getTableName());
446    HTU.deleteTable(tableDescriptor.getTableName());
447
448    HTU2.getAdmin().disableTable(tableDescriptor.getTableName());
449    HTU2.deleteTable(tableDescriptor.getTableName());
450
451    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
452    // the minicluster has negative impact of deleting all HConnections in JVM.
453  }
454
455  @Test
456  public void testBulkLoad() throws IOException {
457    // Create table then get the single region for our new table.
458    LOG.debug("Creating test table");
459    TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
460      TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3,
461      HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
462    builder.setRegionReplication(NB_SERVERS);
463    builder.setCoprocessor(SlowMeCopro.class.getName());
464    TableDescriptor hdt = builder.build();
465    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
466
467    // create hfiles to load.
468    LOG.debug("Creating test data");
469    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
470    final int numRows = 10;
471    final byte[] qual = Bytes.toBytes("qual");
472    final byte[] val = Bytes.toBytes("val");
473    Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
474    for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) {
475      Path hfile = new Path(dir, col.getNameAsString());
476      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
477        val, numRows);
478      family2Files.put(col.getName(), Collections.singletonList(hfile));
479    }
480
481    // bulk load HFiles
482    LOG.debug("Loading test data");
483    BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
484
485    // verify we can read them from the primary
486    LOG.debug("Verifying data load");
487    for (int i = 0; i < numRows; i++) {
488      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
489      Get g = new Get(row);
490      Result r = table.get(g);
491      Assert.assertFalse(r.isStale());
492    }
493
494    // verify we can read them from the replica
495    LOG.debug("Verifying replica queries");
496    try {
497      SlowMeCopro.cdl.set(new CountDownLatch(1));
498      for (int i = 0; i < numRows; i++) {
499        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
500        Get g = new Get(row);
501        g.setConsistency(Consistency.TIMELINE);
502        Result r = table.get(g);
503        Assert.assertTrue(r.isStale());
504      }
505      SlowMeCopro.cdl.get().countDown();
506    } finally {
507      SlowMeCopro.cdl.get().countDown();
508      SlowMeCopro.sleepTime.set(0);
509    }
510
511    HTU.getAdmin().disableTable(hdt.getTableName());
512    HTU.deleteTable(hdt.getTableName());
513  }
514
515  @Test
516  public void testReplicaGetWithPrimaryDown() throws IOException {
517    // Create table then get the single region for our new table.
518    TableDescriptorBuilder builder =
519      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
520        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
521        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
522    builder.setRegionReplication(NB_SERVERS);
523    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
524    TableDescriptor hdt = builder.build();
525    try {
526      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
527
528      Put p = new Put(row);
529      p.addColumn(f, row, row);
530      table.put(p);
531
532      // Flush so it can be picked by the replica refresher thread
533      HTU.flush(table.getName());
534
535      // Sleep for some time until data is picked up by replicas
536      try {
537        Thread.sleep(2 * REFRESH_PERIOD);
538      } catch (InterruptedException e1) {
539        LOG.error(e1.toString(), e1);
540      }
541
542      // But if we ask for stale we will get it
543      Get g = new Get(row);
544      g.setConsistency(Consistency.TIMELINE);
545      Result r = table.get(g);
546      Assert.assertTrue(r.isStale());
547    } finally {
548      HTU.getAdmin().disableTable(hdt.getTableName());
549      HTU.deleteTable(hdt.getTableName());
550    }
551  }
552
553  @Test
554  public void testReplicaScanWithPrimaryDown() throws IOException {
555    // Create table then get the single region for our new table.
556    TableDescriptorBuilder builder =
557      HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
558        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
559        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
560    builder.setRegionReplication(NB_SERVERS);
561    builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
562    TableDescriptor hdt = builder.build();
563    try {
564      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
565
566      Put p = new Put(row);
567      p.addColumn(f, row, row);
568      table.put(p);
569
570      // Flush so it can be picked by the replica refresher thread
571      HTU.flush(table.getName());
572
573      // Sleep for some time until data is picked up by replicas
574      try {
575        Thread.sleep(2 * REFRESH_PERIOD);
576      } catch (InterruptedException e1) {
577        LOG.error(e1.toString(), e1);
578      }
579
580      // But if we ask for stale we will get it
581      // Instantiating the Scan class
582      Scan scan = new Scan();
583
584      // Scanning the required columns
585      scan.addFamily(f);
586      scan.setConsistency(Consistency.TIMELINE);
587
588      // Getting the scan result
589      ResultScanner scanner = table.getScanner(scan);
590
591      Result r = scanner.next();
592
593      Assert.assertTrue(r.isStale());
594    } finally {
595      HTU.getAdmin().disableTable(hdt.getTableName());
596      HTU.deleteTable(hdt.getTableName());
597    }
598  }
599
600  @Test
601  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
602    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
603    HTU.getConfiguration().set("hbase.rpc.client.impl",
604      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
605    // Create table then get the single region for our new table.
606    TableDescriptorBuilder builder =
607      HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
608        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
609        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
610    builder.setRegionReplication(NB_SERVERS);
611    builder.setCoprocessor(SlowMeCopro.class.getName());
612    TableDescriptor hdt = builder.build();
613    try {
614      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
615
616      Put p = new Put(row);
617      p.addColumn(f, row, row);
618      table.put(p);
619
620      // Flush so it can be picked by the replica refresher thread
621      HTU.flush(table.getName());
622
623      // Sleep for some time until data is picked up by replicas
624      try {
625        Thread.sleep(2 * REFRESH_PERIOD);
626      } catch (InterruptedException e1) {
627        LOG.error(e1.toString(), e1);
628      }
629
630      try {
631        // Create the new connection so new config can kick in
632        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
633        Table t = connection.getTable(hdt.getTableName());
634
635        // But if we ask for stale we will get it
636        SlowMeCopro.cdl.set(new CountDownLatch(1));
637        Get g = new Get(row);
638        g.setConsistency(Consistency.TIMELINE);
639        Result r = t.get(g);
640        Assert.assertTrue(r.isStale());
641        SlowMeCopro.cdl.get().countDown();
642      } finally {
643        SlowMeCopro.cdl.get().countDown();
644        SlowMeCopro.sleepTime.set(0);
645      }
646    } finally {
647      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
648      HTU.getConfiguration().unset("hbase.rpc.client.impl");
649      HTU.getAdmin().disableTable(hdt.getTableName());
650      HTU.deleteTable(hdt.getTableName());
651    }
652  }
653}