001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
046import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
050import org.apache.hadoop.hbase.coprocessor.RegionObserver;
051import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
052import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
054import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
061import org.junit.AfterClass;
062import org.junit.Assert;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Category({ LargeTests.class, ClientTests.class })
073public class TestReplicaWithCluster {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
080
081  private static final int NB_SERVERS = 3;
082  private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
083  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
084
085  // second minicluster used in testing of replication
086  private static HBaseTestingUtility HTU2;
087  private static final byte[] f = HConstants.CATALOG_FAMILY;
088
089  private final static int REFRESH_PERIOD = 1000;
090  private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
091
092  @Rule
093  public TestName name = new TestName();
094
095  /**
096   * This copro is used to synchronize the tests.
097   */
098  public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
099    static final AtomicLong sleepTime = new AtomicLong(0);
100    static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
101
102    public SlowMeCopro() {
103    }
104
105    @Override
106    public Optional<RegionObserver> getRegionObserver() {
107      return Optional.of(this);
108    }
109
110    @Override
111    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
112      final List<Cell> results) throws IOException {
113
114      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
115        CountDownLatch latch = cdl.get();
116        try {
117          if (sleepTime.get() > 0) {
118            LOG.info("Sleeping for " + sleepTime.get() + " ms");
119            Thread.sleep(sleepTime.get());
120          } else if (latch.getCount() > 0) {
121            LOG.info("Waiting for the counterCountDownLatch");
122            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
123            if (latch.getCount() > 0) {
124              throw new RuntimeException("Can't wait more");
125            }
126          }
127        } catch (InterruptedException e1) {
128          LOG.error(e1.toString(), e1);
129        }
130      } else {
131        LOG.info("We're not the primary replicas.");
132      }
133    }
134  }
135
136  /**
137   * This copro is used to simulate region server down exception for Get and Scan
138   */
139  @CoreCoprocessor
140  public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
141
142    public RegionServerStoppedCopro() {
143    }
144
145    @Override
146    public Optional<RegionObserver> getRegionObserver() {
147      return Optional.of(this);
148    }
149
150    @Override
151    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
152      final List<Cell> results) throws IOException {
153
154      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
155
156      // Fail for the primary replica and replica 1
157      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
158        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
159        throw new RegionServerStoppedException(
160          "Server " + e.getEnvironment().getServerName() + " not running");
161      } else {
162        LOG.info("We're replica region " + replicaId);
163      }
164    }
165
166    @Override
167    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
168      final Scan scan) throws IOException {
169      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
170      // Fail for the primary replica and replica 1
171      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
172        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
173        throw new RegionServerStoppedException(
174          "Server " + e.getEnvironment().getServerName() + " not running");
175      } else {
176        LOG.info("We're replica region " + replicaId);
177      }
178    }
179  }
180
181  /**
182   * This copro is used to slow down the primary meta region scan a bit
183   */
184  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
185    implements RegionCoprocessor, RegionObserver {
186    static boolean slowDownPrimaryMetaScan = false;
187    static boolean throwException = false;
188
189    @Override
190    public Optional<RegionObserver> getRegionObserver() {
191      return Optional.of(this);
192    }
193
194    @Override
195    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
196      final List<Cell> results) throws IOException {
197
198      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
199
200      // Fail for the primary replica, but not for meta
201      if (throwException) {
202        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
203          LOG.info("Get, throw Region Server Stopped Exceptoin for region "
204            + e.getEnvironment().getRegion().getRegionInfo());
205          throw new RegionServerStoppedException(
206            "Server " + e.getEnvironment().getServerName() + " not running");
207        }
208      } else {
209        LOG.info("Get, We're replica region " + replicaId);
210      }
211    }
212
213    @Override
214    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
215      final Scan scan) throws IOException {
216
217      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
218
219      // Slow down with the primary meta region scan
220      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
221        if (slowDownPrimaryMetaScan) {
222          LOG.info("Scan with primary meta region, slow down a bit");
223          try {
224            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
225          } catch (InterruptedException ie) {
226            // Ingore
227          }
228        }
229
230        // Fail for the primary replica
231        if (throwException) {
232          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica "
233            + e.getEnvironment().getRegion().getRegionInfo());
234
235          throw new RegionServerStoppedException(
236            "Server " + e.getEnvironment().getServerName() + " not running");
237        } else {
238          LOG.info("Scan, We're replica region " + replicaId);
239        }
240      } else {
241        LOG.info("Scan, We're replica region " + replicaId);
242      }
243    }
244  }
245
246  @BeforeClass
247  public static void beforeClass() throws Exception {
248    // enable store file refreshing
249    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
250      REFRESH_PERIOD);
251
252    HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
253    HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
254    HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
255    HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
256    HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
257    HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
258    HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
259
260    // Wait for primary call longer so make sure that it will get exception from the primary call
261    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
262    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
263
264    // Make sure master does not host system tables.
265    HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
266
267    // Set system coprocessor so it can be applied to meta regions
268    HTU.getConfiguration().set("hbase.coprocessor.region.classes",
269      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
270
271    HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
272      META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
273
274    HTU.startMiniCluster(NB_SERVERS);
275    // Enable meta replica at server side
276    HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
277
278    HTU.getHBaseCluster().startMaster();
279  }
280
281  @AfterClass
282  public static void afterClass() throws Exception {
283    if (HTU2 != null) HTU2.shutdownMiniCluster();
284    HTU.shutdownMiniCluster();
285  }
286
287  @Test
288  public void testCreateDeleteTable() throws IOException {
289    // Create table then get the single region for our new table.
290    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
291    hdt.setRegionReplication(NB_SERVERS);
292    hdt.addCoprocessor(SlowMeCopro.class.getName());
293    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
294
295    Put p = new Put(row);
296    p.addColumn(f, row, row);
297    table.put(p);
298
299    Get g = new Get(row);
300    Result r = table.get(g);
301    Assert.assertFalse(r.isStale());
302
303    try {
304      // But if we ask for stale we will get it
305      SlowMeCopro.cdl.set(new CountDownLatch(1));
306      g = new Get(row);
307      g.setConsistency(Consistency.TIMELINE);
308      r = table.get(g);
309      Assert.assertTrue(r.isStale());
310      SlowMeCopro.cdl.get().countDown();
311    } finally {
312      SlowMeCopro.cdl.get().countDown();
313      SlowMeCopro.sleepTime.set(0);
314    }
315
316    HTU.getAdmin().disableTable(hdt.getTableName());
317    HTU.deleteTable(hdt.getTableName());
318  }
319
320  @Test
321  public void testChangeTable() throws Exception {
322    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
323      .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName())
324      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
325    HTU.getAdmin().createTable(td);
326    Table table = HTU.getConnection().getTable(td.getTableName());
327    // basic test: it should work.
328    Put p = new Put(row);
329    p.addColumn(f, row, row);
330    table.put(p);
331
332    Get g = new Get(row);
333    Result r = table.get(g);
334    Assert.assertFalse(r.isStale());
335
336    // Add a CF, it should work.
337    TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
338    td = TableDescriptorBuilder.newBuilder(td)
339      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build();
340    HTU.getAdmin().disableTable(td.getTableName());
341    HTU.getAdmin().modifyTable(td);
342    HTU.getAdmin().enableTable(td.getTableName());
343    TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
344    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
345      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
346
347    p = new Put(row);
348    p.addColumn(row, row, row);
349    table.put(p);
350
351    g = new Get(row);
352    r = table.get(g);
353    Assert.assertFalse(r.isStale());
354
355    try {
356      SlowMeCopro.cdl.set(new CountDownLatch(1));
357      g = new Get(row);
358      g.setConsistency(Consistency.TIMELINE);
359      r = table.get(g);
360      Assert.assertTrue(r.isStale());
361    } finally {
362      SlowMeCopro.cdl.get().countDown();
363      SlowMeCopro.sleepTime.set(0);
364    }
365
366    Admin admin = HTU.getAdmin();
367    nHdt = admin.getDescriptor(td.getTableName());
368    Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
369      bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
370
371    admin.disableTable(td.getTableName());
372    admin.deleteTable(td.getTableName());
373    admin.close();
374  }
375
376  @SuppressWarnings("deprecation")
377  @Test
378  public void testReplicaAndReplication() throws Exception {
379    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
380    hdt.setRegionReplication(NB_SERVERS);
381
382    HColumnDescriptor fam = new HColumnDescriptor(row);
383    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
384    hdt.addFamily(fam);
385
386    hdt.addCoprocessor(SlowMeCopro.class.getName());
387    HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
388
389    Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
390    conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
391    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
392    MiniZooKeeperCluster miniZK = HTU.getZkCluster();
393
394    HTU2 = new HBaseTestingUtility(conf2);
395    HTU2.setZkCluster(miniZK);
396    HTU2.startMiniCluster(NB_SERVERS);
397    LOG.info("Setup second Zk");
398    HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
399
400    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
401
402    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
403    rpc.setClusterKey(HTU2.getClusterKey());
404    admin.addPeer("2", rpc, null);
405    admin.close();
406
407    Put p = new Put(row);
408    p.addColumn(row, row, row);
409    final Table table = HTU.getConnection().getTable(hdt.getTableName());
410    table.put(p);
411
412    HTU.getAdmin().flush(table.getName());
413    LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
414
415    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
416      @Override
417      public boolean evaluate() throws Exception {
418        try {
419          SlowMeCopro.cdl.set(new CountDownLatch(1));
420          Get g = new Get(row);
421          g.setConsistency(Consistency.TIMELINE);
422          Result r = table.get(g);
423          Assert.assertTrue(r.isStale());
424          return !r.isEmpty();
425        } finally {
426          SlowMeCopro.cdl.get().countDown();
427          SlowMeCopro.sleepTime.set(0);
428        }
429      }
430    });
431    table.close();
432    LOG.info("stale get on the first cluster done. Now for the second.");
433
434    final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
435    Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
436      @Override
437      public boolean evaluate() throws Exception {
438        try {
439          SlowMeCopro.cdl.set(new CountDownLatch(1));
440          Get g = new Get(row);
441          g.setConsistency(Consistency.TIMELINE);
442          Result r = table2.get(g);
443          Assert.assertTrue(r.isStale());
444          return !r.isEmpty();
445        } finally {
446          SlowMeCopro.cdl.get().countDown();
447          SlowMeCopro.sleepTime.set(0);
448        }
449      }
450    });
451    table2.close();
452
453    HTU.getAdmin().disableTable(hdt.getTableName());
454    HTU.deleteTable(hdt.getTableName());
455
456    HTU2.getAdmin().disableTable(hdt.getTableName());
457    HTU2.deleteTable(hdt.getTableName());
458
459    // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
460    // the minicluster has negative impact of deleting all HConnections in JVM.
461  }
462
463  @Test
464  public void testBulkLoad() throws IOException {
465    // Create table then get the single region for our new table.
466    LOG.debug("Creating test table");
467    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
468    hdt.setRegionReplication(NB_SERVERS);
469    hdt.addCoprocessor(SlowMeCopro.class.getName());
470    Table table = HTU.createTable(hdt, new byte[][] { f }, null);
471
472    // create hfiles to load.
473    LOG.debug("Creating test data");
474    Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName());
475    final int numRows = 10;
476    final byte[] qual = Bytes.toBytes("qual");
477    final byte[] val = Bytes.toBytes("val");
478    final List<Pair<byte[], String>> famPaths = new ArrayList<>();
479    for (HColumnDescriptor col : hdt.getColumnFamilies()) {
480      Path hfile = new Path(dir, col.getNameAsString());
481      TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
482        val, numRows);
483      famPaths.add(new Pair<>(col.getName(), hfile.toString()));
484    }
485
486    // bulk load HFiles
487    LOG.debug("Loading test data");
488    final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
489    table = conn.getTable(hdt.getTableName());
490    final String bulkToken =
491      new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
492    ClientServiceCallable<Void> callable =
493      new ClientServiceCallable<Void>(conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
494        new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET,
495        Collections.emptyMap()) {
496        @Override
497        protected Void rpcCall() throws Exception {
498          LOG.debug("Going to connect to server " + getLocation() + " for row "
499            + Bytes.toStringBinary(getRow()));
500          SecureBulkLoadClient secureClient = null;
501          byte[] regionName = getLocation().getRegionInfo().getRegionName();
502          try (Table table = conn.getTable(getTableName())) {
503            secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
504            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null,
505              bulkToken);
506          }
507          return null;
508        }
509      };
510    RpcRetryingCallerFactory factory =
511      new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration());
512    RpcRetryingCaller<Void> caller = factory.newCaller();
513    caller.callWithRetries(callable, 10000);
514
515    // verify we can read them from the primary
516    LOG.debug("Verifying data load");
517    for (int i = 0; i < numRows; i++) {
518      byte[] row = TestHRegionServerBulkLoad.rowkey(i);
519      Get g = new Get(row);
520      Result r = table.get(g);
521      Assert.assertFalse(r.isStale());
522    }
523
524    // verify we can read them from the replica
525    LOG.debug("Verifying replica queries");
526    try {
527      SlowMeCopro.cdl.set(new CountDownLatch(1));
528      for (int i = 0; i < numRows; i++) {
529        byte[] row = TestHRegionServerBulkLoad.rowkey(i);
530        Get g = new Get(row);
531        g.setConsistency(Consistency.TIMELINE);
532        Result r = table.get(g);
533        Assert.assertTrue(r.isStale());
534      }
535      SlowMeCopro.cdl.get().countDown();
536    } finally {
537      SlowMeCopro.cdl.get().countDown();
538      SlowMeCopro.sleepTime.set(0);
539    }
540
541    HTU.getAdmin().disableTable(hdt.getTableName());
542    HTU.deleteTable(hdt.getTableName());
543  }
544
545  @Test
546  public void testReplicaGetWithPrimaryDown() throws IOException {
547    // Create table then get the single region for our new table.
548    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
549    hdt.setRegionReplication(NB_SERVERS);
550    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
551    try {
552      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
553
554      Put p = new Put(row);
555      p.addColumn(f, row, row);
556      table.put(p);
557
558      // Flush so it can be picked by the replica refresher thread
559      HTU.flush(table.getName());
560
561      // Sleep for some time until data is picked up by replicas
562      try {
563        Thread.sleep(2 * REFRESH_PERIOD);
564      } catch (InterruptedException e1) {
565        LOG.error(e1.toString(), e1);
566      }
567
568      // But if we ask for stale we will get it
569      Get g = new Get(row);
570      g.setConsistency(Consistency.TIMELINE);
571      Result r = table.get(g);
572      Assert.assertTrue(r.isStale());
573    } finally {
574      HTU.getAdmin().disableTable(hdt.getTableName());
575      HTU.deleteTable(hdt.getTableName());
576    }
577  }
578
579  @Test
580  public void testReplicaScanWithPrimaryDown() throws IOException {
581    // Create table then get the single region for our new table.
582    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
583    hdt.setRegionReplication(NB_SERVERS);
584    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
585
586    try {
587      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
588
589      Put p = new Put(row);
590      p.addColumn(f, row, row);
591      table.put(p);
592
593      // Flush so it can be picked by the replica refresher thread
594      HTU.flush(table.getName());
595
596      // Sleep for some time until data is picked up by replicas
597      try {
598        Thread.sleep(2 * REFRESH_PERIOD);
599      } catch (InterruptedException e1) {
600        LOG.error(e1.toString(), e1);
601      }
602
603      // But if we ask for stale we will get it
604      // Instantiating the Scan class
605      Scan scan = new Scan();
606
607      // Scanning the required columns
608      scan.addFamily(f);
609      scan.setConsistency(Consistency.TIMELINE);
610
611      // Getting the scan result
612      ResultScanner scanner = table.getScanner(scan);
613
614      Result r = scanner.next();
615
616      Assert.assertTrue(r.isStale());
617    } finally {
618      HTU.getAdmin().disableTable(hdt.getTableName());
619      HTU.deleteTable(hdt.getTableName());
620    }
621  }
622
623  @Test
624  public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
625    HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
626    HTU.getConfiguration().set("hbase.rpc.client.impl",
627      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
628    // Create table then get the single region for our new table.
629    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
630    hdt.setRegionReplication(NB_SERVERS);
631    hdt.addCoprocessor(SlowMeCopro.class.getName());
632
633    try {
634      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
635
636      Put p = new Put(row);
637      p.addColumn(f, row, row);
638      table.put(p);
639
640      // Flush so it can be picked by the replica refresher thread
641      HTU.flush(table.getName());
642
643      // Sleep for some time until data is picked up by replicas
644      try {
645        Thread.sleep(2 * REFRESH_PERIOD);
646      } catch (InterruptedException e1) {
647        LOG.error(e1.toString(), e1);
648      }
649
650      try {
651        // Create the new connection so new config can kick in
652        Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
653        Table t = connection.getTable(hdt.getTableName());
654
655        // But if we ask for stale we will get it
656        SlowMeCopro.cdl.set(new CountDownLatch(1));
657        Get g = new Get(row);
658        g.setConsistency(Consistency.TIMELINE);
659        Result r = t.get(g);
660        Assert.assertTrue(r.isStale());
661        SlowMeCopro.cdl.get().countDown();
662      } finally {
663        SlowMeCopro.cdl.get().countDown();
664        SlowMeCopro.sleepTime.set(0);
665      }
666    } finally {
667      HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
668      HTU.getConfiguration().unset("hbase.rpc.client.impl");
669      HTU.getAdmin().disableTable(hdt.getTableName());
670      HTU.deleteTable(hdt.getTableName());
671    }
672  }
673
674  // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table
675  // scan will always get the result from primary meta region as long as the result is returned
676  // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region.
677  @Test
678  public void testGetRegionLocationFromPrimaryMetaRegion()
679    throws IOException, InterruptedException {
680    HTU.getAdmin().setBalancerRunning(false, true);
681    Configuration conf = new Configuration(HTU.getConfiguration());
682    conf.setBoolean(USE_META_REPLICAS, true);
683    Connection conn = ConnectionFactory.createConnection(conf);
684
685    // Create table then get the single region for our new table.
686    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
687    hdt.setRegionReplication(2);
688    try {
689
690      HTU.createTable(hdt, new byte[][] { f }, null);
691
692      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
693
694      // Get user table location, always get it from the primary meta replica
695      RegionLocations url =
696        ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false);
697
698    } finally {
699      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
700      HTU.getAdmin().setBalancerRunning(true, true);
701      HTU.getAdmin().disableTable(hdt.getTableName());
702      HTU.deleteTable(hdt.getTableName());
703    }
704  }
705
706  // This test is to simulate the case that the meta region and the primary user region
707  // are down, hbase client is able to access user replica regions and return stale data.
708  // Meta replica is enabled to show the case that the meta replica region could be out of sync
709  // with the primary meta region.
710  @Test
711  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
712    HTU.getAdmin().setBalancerRunning(false, true);
713
714    Configuration conf = new Configuration(HTU.getConfiguration());
715    conf.setBoolean(USE_META_REPLICAS, true);
716    Connection conn = ConnectionFactory.createConnection(conf);
717
718    // Create table then get the single region for our new table.
719    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
720    hdt.setRegionReplication(2);
721    try {
722
723      HTU.createTable(hdt, new byte[][] { f }, null);
724      Table table = conn.getTable(TableName.valueOf(name.getMethodName()));
725
726      // Get Meta location
727      RegionLocations mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME,
728        HConstants.EMPTY_START_ROW, false, false);
729
730      // Get user table location
731      RegionLocations url =
732        ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false);
733
734      // Make sure that user primary region is co-hosted with the meta region
735      if (
736        !url.getDefaultRegionLocation().getServerName()
737          .equals(mrl.getDefaultRegionLocation().getServerName())
738      ) {
739        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
740          mrl.getDefaultRegionLocation().getServerName());
741      }
742
743      // Make sure that the user replica region is not hosted by the same region server with
744      // primary
745      if (
746        url.getRegionLocation(1).getServerName()
747          .equals(mrl.getDefaultRegionLocation().getServerName())
748      ) {
749        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
750          url.getDefaultRegionLocation().getServerName());
751      }
752
753      // Wait until the meta table is updated with new location info
754      while (true) {
755        mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME,
756          HConstants.EMPTY_START_ROW, false, false);
757
758        // Get user table location
759        url = ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, true);
760
761        LOG.info("meta locations " + mrl);
762        LOG.info("table locations " + url);
763        ServerName a = url.getDefaultRegionLocation().getServerName();
764        ServerName b = mrl.getDefaultRegionLocation().getServerName();
765        if (a.equals(b)) {
766          break;
767        } else {
768          LOG.info("Waiting for new region info to be updated in meta table");
769          Thread.sleep(100);
770        }
771      }
772
773      Put p = new Put(row);
774      p.addColumn(f, row, row);
775      table.put(p);
776
777      // Flush so it can be picked by the replica refresher thread
778      HTU.flush(table.getName());
779
780      // Sleep for some time until data is picked up by replicas
781      try {
782        Thread.sleep(2 * REFRESH_PERIOD);
783      } catch (InterruptedException e1) {
784        LOG.error(e1.toString(), e1);
785      }
786
787      // Simulating the RS down
788      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
789
790      // The first Get is supposed to succeed
791      Get g = new Get(row);
792      g.setConsistency(Consistency.TIMELINE);
793      Result r = table.get(g);
794      Assert.assertTrue(r.isStale());
795
796      // The second Get will succeed as well
797      r = table.get(g);
798      Assert.assertTrue(r.isStale());
799    } finally {
800      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
801      HTU.getAdmin().setBalancerRunning(true, true);
802      HTU.getAdmin().disableTable(hdt.getTableName());
803      HTU.deleteTable(hdt.getTableName());
804    }
805  }
806}