001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 046import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 047import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 054import org.junit.AfterClass; 055import org.junit.Assert; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ LargeTests.class, ClientTests.class }) 064public class TestReplicaWithCluster { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestReplicaWithCluster.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 071 072 private static final int NB_SERVERS = 3; 073 private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName()); 074 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 075 076 // second minicluster used in testing of replication 077 private static HBaseTestingUtil HTU2; 078 private static final byte[] f = HConstants.CATALOG_FAMILY; 079 080 private final static int REFRESH_PERIOD = 1000; 081 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 082 083 /** 084 * This copro is used to synchronize the tests. 085 */ 086 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 087 static final AtomicLong sleepTime = new AtomicLong(0); 088 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 089 090 public SlowMeCopro() { 091 } 092 093 @Override 094 public Optional<RegionObserver> getRegionObserver() { 095 return Optional.of(this); 096 } 097 098 @Override 099 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 100 final Get get, final List<Cell> results) throws IOException { 101 102 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 103 CountDownLatch latch = cdl.get(); 104 try { 105 if (sleepTime.get() > 0) { 106 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 107 Thread.sleep(sleepTime.get()); 108 } else if (latch.getCount() > 0) { 109 LOG.info("Waiting for the counterCountDownLatch"); 110 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 111 if (latch.getCount() > 0) { 112 throw new RuntimeException("Can't wait more"); 113 } 114 } 115 } catch (InterruptedException e1) { 116 LOG.error(e1.toString(), e1); 117 } 118 } else { 119 LOG.info("We're not the primary replicas."); 120 } 121 } 122 } 123 124 /** 125 * This copro is used to simulate region server down exception for Get and Scan 126 */ 127 @CoreCoprocessor 128 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 129 130 public RegionServerStoppedCopro() { 131 } 132 133 @Override 134 public Optional<RegionObserver> getRegionObserver() { 135 return Optional.of(this); 136 } 137 138 @Override 139 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 140 final Get get, final List<Cell> results) throws IOException { 141 142 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 143 144 // Fail for the primary replica and replica 1 145 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 146 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 147 throw new RegionServerStoppedException( 148 "Server " + e.getEnvironment().getServerName() + " not running"); 149 } else { 150 LOG.info("We're replica region " + replicaId); 151 } 152 } 153 154 @Override 155 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 156 final Scan scan) throws IOException { 157 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 158 // Fail for the primary replica and replica 1 159 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 160 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 161 throw new RegionServerStoppedException( 162 "Server " + e.getEnvironment().getServerName() + " not running"); 163 } else { 164 LOG.info("We're replica region " + replicaId); 165 } 166 } 167 } 168 169 /** 170 * This copro is used to slow down the primary meta region scan a bit 171 */ 172 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 173 implements RegionCoprocessor, RegionObserver { 174 static boolean slowDownPrimaryMetaScan = false; 175 static boolean throwException = false; 176 177 @Override 178 public Optional<RegionObserver> getRegionObserver() { 179 return Optional.of(this); 180 } 181 182 @Override 183 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 184 final Get get, final List<Cell> results) throws IOException { 185 186 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 187 188 // Fail for the primary replica, but not for meta 189 if (throwException) { 190 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 191 LOG.info("Get, throw Region Server Stopped Exceptoin for region " 192 + e.getEnvironment().getRegion().getRegionInfo()); 193 throw new RegionServerStoppedException( 194 "Server " + e.getEnvironment().getServerName() + " not running"); 195 } 196 } else { 197 LOG.info("Get, We're replica region " + replicaId); 198 } 199 } 200 201 @Override 202 public void preScannerOpen(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 203 final Scan scan) throws IOException { 204 205 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 206 207 // Slow down with the primary meta region scan 208 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 209 if (slowDownPrimaryMetaScan) { 210 LOG.info("Scan with primary meta region, slow down a bit"); 211 try { 212 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 213 } catch (InterruptedException ie) { 214 // Ingore 215 } 216 } 217 218 // Fail for the primary replica 219 if (throwException) { 220 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " 221 + e.getEnvironment().getRegion().getRegionInfo()); 222 223 throw new RegionServerStoppedException( 224 "Server " + e.getEnvironment().getServerName() + " not running"); 225 } else { 226 LOG.info("Scan, We're replica region " + replicaId); 227 } 228 } else { 229 LOG.info("Scan, We're replica region " + replicaId); 230 } 231 } 232 } 233 234 @BeforeClass 235 public static void beforeClass() throws Exception { 236 // enable store file refreshing 237 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 238 REFRESH_PERIOD); 239 240 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 241 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 242 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 243 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 244 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 245 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 246 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 247 248 // Wait for primary call longer so make sure that it will get exception from the primary call 249 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 250 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 251 252 // Make sure master does not host system tables. 253 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 254 255 // Set system coprocessor so it can be applied to meta regions 256 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 257 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 258 259 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 260 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 261 262 HTU.startMiniCluster(NB_SERVERS); 263 // Enable meta replica at server side 264 HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2); 265 266 HTU.getHBaseCluster().startMaster(); 267 } 268 269 @AfterClass 270 public static void afterClass() throws Exception { 271 if (HTU2 != null) HTU2.shutdownMiniCluster(); 272 HTU.shutdownMiniCluster(); 273 } 274 275 @Test 276 public void testCreateDeleteTable() throws IOException { 277 // Create table then get the single region for our new table. 278 TableDescriptorBuilder builder = 279 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 280 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 281 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 282 builder.setRegionReplication(NB_SERVERS); 283 builder.setCoprocessor(SlowMeCopro.class.getName()); 284 TableDescriptor hdt = builder.build(); 285 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 286 287 Put p = new Put(row); 288 p.addColumn(f, row, row); 289 table.put(p); 290 291 Get g = new Get(row); 292 Result r = table.get(g); 293 Assert.assertFalse(r.isStale()); 294 295 try { 296 // But if we ask for stale we will get it 297 SlowMeCopro.cdl.set(new CountDownLatch(1)); 298 g = new Get(row); 299 g.setConsistency(Consistency.TIMELINE); 300 r = table.get(g); 301 Assert.assertTrue(r.isStale()); 302 SlowMeCopro.cdl.get().countDown(); 303 } finally { 304 SlowMeCopro.cdl.get().countDown(); 305 SlowMeCopro.sleepTime.set(0); 306 } 307 308 HTU.getAdmin().disableTable(hdt.getTableName()); 309 HTU.deleteTable(hdt.getTableName()); 310 } 311 312 @Test 313 public void testChangeTable() throws Exception { 314 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable")) 315 .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName()) 316 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 317 HTU.getAdmin().createTable(td); 318 Table table = HTU.getConnection().getTable(td.getTableName()); 319 // basic test: it should work. 320 Put p = new Put(row); 321 p.addColumn(f, row, row); 322 table.put(p); 323 324 Get g = new Get(row); 325 Result r = table.get(g); 326 Assert.assertFalse(r.isStale()); 327 328 // Add a CF, it should work. 329 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 330 td = TableDescriptorBuilder.newBuilder(td) 331 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build(); 332 HTU.getAdmin().disableTable(td.getTableName()); 333 HTU.getAdmin().modifyTable(td); 334 HTU.getAdmin().enableTable(td.getTableName()); 335 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 336 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 337 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 338 339 p = new Put(row); 340 p.addColumn(row, row, row); 341 table.put(p); 342 343 g = new Get(row); 344 r = table.get(g); 345 Assert.assertFalse(r.isStale()); 346 347 try { 348 SlowMeCopro.cdl.set(new CountDownLatch(1)); 349 g = new Get(row); 350 g.setConsistency(Consistency.TIMELINE); 351 r = table.get(g); 352 Assert.assertTrue(r.isStale()); 353 } finally { 354 SlowMeCopro.cdl.get().countDown(); 355 SlowMeCopro.sleepTime.set(0); 356 } 357 358 Admin admin = HTU.getAdmin(); 359 nHdt = admin.getDescriptor(td.getTableName()); 360 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 361 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 362 363 admin.disableTable(td.getTableName()); 364 admin.deleteTable(td.getTableName()); 365 admin.close(); 366 } 367 368 @SuppressWarnings("deprecation") 369 @Test 370 public void testReplicaAndReplication() throws Exception { 371 TableDescriptorBuilder builder = 372 HTU.createModifyableTableDescriptor("testReplicaAndReplication"); 373 builder.setRegionReplication(NB_SERVERS); 374 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row) 375 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 376 377 builder.setCoprocessor(SlowMeCopro.class.getName()); 378 TableDescriptor tableDescriptor = builder.build(); 379 HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 380 381 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 382 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 383 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 384 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 385 386 HTU2 = new HBaseTestingUtil(conf2); 387 HTU2.setZkCluster(miniZK); 388 HTU2.startMiniCluster(NB_SERVERS); 389 LOG.info("Setup second Zk"); 390 HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 391 392 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 393 Admin admin = connection.getAdmin()) { 394 ReplicationPeerConfig rpc = 395 ReplicationPeerConfig.newBuilder().setClusterKey(HTU2.getRpcConnnectionURI()).build(); 396 admin.addReplicationPeer("2", rpc); 397 } 398 399 Put p = new Put(row); 400 p.addColumn(row, row, row); 401 final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName()); 402 table.put(p); 403 404 HTU.getAdmin().flush(table.getName()); 405 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 406 407 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 408 @Override 409 public boolean evaluate() throws Exception { 410 try { 411 SlowMeCopro.cdl.set(new CountDownLatch(1)); 412 Get g = new Get(row); 413 g.setConsistency(Consistency.TIMELINE); 414 Result r = table.get(g); 415 Assert.assertTrue(r.isStale()); 416 return !r.isEmpty(); 417 } finally { 418 SlowMeCopro.cdl.get().countDown(); 419 SlowMeCopro.sleepTime.set(0); 420 } 421 } 422 }); 423 table.close(); 424 LOG.info("stale get on the first cluster done. Now for the second."); 425 426 final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName()); 427 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 428 @Override 429 public boolean evaluate() throws Exception { 430 try { 431 SlowMeCopro.cdl.set(new CountDownLatch(1)); 432 Get g = new Get(row); 433 g.setConsistency(Consistency.TIMELINE); 434 Result r = table2.get(g); 435 Assert.assertTrue(r.isStale()); 436 return !r.isEmpty(); 437 } finally { 438 SlowMeCopro.cdl.get().countDown(); 439 SlowMeCopro.sleepTime.set(0); 440 } 441 } 442 }); 443 table2.close(); 444 445 HTU.getAdmin().disableTable(tableDescriptor.getTableName()); 446 HTU.deleteTable(tableDescriptor.getTableName()); 447 448 HTU2.getAdmin().disableTable(tableDescriptor.getTableName()); 449 HTU2.deleteTable(tableDescriptor.getTableName()); 450 451 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 452 // the minicluster has negative impact of deleting all HConnections in JVM. 453 } 454 455 @Test 456 public void testBulkLoad() throws IOException { 457 // Create table then get the single region for our new table. 458 LOG.debug("Creating test table"); 459 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor( 460 TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, 461 HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 462 builder.setRegionReplication(NB_SERVERS); 463 builder.setCoprocessor(SlowMeCopro.class.getName()); 464 TableDescriptor hdt = builder.build(); 465 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 466 467 // create hfiles to load. 468 LOG.debug("Creating test data"); 469 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad"); 470 final int numRows = 10; 471 final byte[] qual = Bytes.toBytes("qual"); 472 final byte[] val = Bytes.toBytes("val"); 473 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR); 474 for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) { 475 Path hfile = new Path(dir, col.getNameAsString()); 476 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, 477 val, numRows); 478 family2Files.put(col.getName(), Collections.singletonList(hfile)); 479 } 480 481 // bulk load HFiles 482 LOG.debug("Loading test data"); 483 BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files); 484 485 // verify we can read them from the primary 486 LOG.debug("Verifying data load"); 487 for (int i = 0; i < numRows; i++) { 488 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 489 Get g = new Get(row); 490 Result r = table.get(g); 491 Assert.assertFalse(r.isStale()); 492 } 493 494 // verify we can read them from the replica 495 LOG.debug("Verifying replica queries"); 496 try { 497 SlowMeCopro.cdl.set(new CountDownLatch(1)); 498 for (int i = 0; i < numRows; i++) { 499 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 500 Get g = new Get(row); 501 g.setConsistency(Consistency.TIMELINE); 502 Result r = table.get(g); 503 Assert.assertTrue(r.isStale()); 504 } 505 SlowMeCopro.cdl.get().countDown(); 506 } finally { 507 SlowMeCopro.cdl.get().countDown(); 508 SlowMeCopro.sleepTime.set(0); 509 } 510 511 HTU.getAdmin().disableTable(hdt.getTableName()); 512 HTU.deleteTable(hdt.getTableName()); 513 } 514 515 @Test 516 public void testReplicaGetWithPrimaryDown() throws IOException { 517 // Create table then get the single region for our new table. 518 TableDescriptorBuilder builder = 519 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 520 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 521 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 522 builder.setRegionReplication(NB_SERVERS); 523 builder.setCoprocessor(RegionServerStoppedCopro.class.getName()); 524 TableDescriptor hdt = builder.build(); 525 try { 526 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 527 528 Put p = new Put(row); 529 p.addColumn(f, row, row); 530 table.put(p); 531 532 // Flush so it can be picked by the replica refresher thread 533 HTU.flush(table.getName()); 534 535 // Sleep for some time until data is picked up by replicas 536 try { 537 Thread.sleep(2 * REFRESH_PERIOD); 538 } catch (InterruptedException e1) { 539 LOG.error(e1.toString(), e1); 540 } 541 542 // But if we ask for stale we will get it 543 Get g = new Get(row); 544 g.setConsistency(Consistency.TIMELINE); 545 Result r = table.get(g); 546 Assert.assertTrue(r.isStale()); 547 } finally { 548 HTU.getAdmin().disableTable(hdt.getTableName()); 549 HTU.deleteTable(hdt.getTableName()); 550 } 551 } 552 553 @Test 554 public void testReplicaScanWithPrimaryDown() throws IOException { 555 // Create table then get the single region for our new table. 556 TableDescriptorBuilder builder = 557 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"), 558 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 559 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 560 builder.setRegionReplication(NB_SERVERS); 561 builder.setCoprocessor(RegionServerStoppedCopro.class.getName()); 562 TableDescriptor hdt = builder.build(); 563 try { 564 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 565 566 Put p = new Put(row); 567 p.addColumn(f, row, row); 568 table.put(p); 569 570 // Flush so it can be picked by the replica refresher thread 571 HTU.flush(table.getName()); 572 573 // Sleep for some time until data is picked up by replicas 574 try { 575 Thread.sleep(2 * REFRESH_PERIOD); 576 } catch (InterruptedException e1) { 577 LOG.error(e1.toString(), e1); 578 } 579 580 // But if we ask for stale we will get it 581 // Instantiating the Scan class 582 Scan scan = new Scan(); 583 584 // Scanning the required columns 585 scan.addFamily(f); 586 scan.setConsistency(Consistency.TIMELINE); 587 588 // Getting the scan result 589 ResultScanner scanner = table.getScanner(scan); 590 591 Result r = scanner.next(); 592 593 Assert.assertTrue(r.isStale()); 594 } finally { 595 HTU.getAdmin().disableTable(hdt.getTableName()); 596 HTU.deleteTable(hdt.getTableName()); 597 } 598 } 599 600 @Test 601 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 602 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 603 HTU.getConfiguration().set("hbase.rpc.client.impl", 604 "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 605 // Create table then get the single region for our new table. 606 TableDescriptorBuilder builder = 607 HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"), 608 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 609 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 610 builder.setRegionReplication(NB_SERVERS); 611 builder.setCoprocessor(SlowMeCopro.class.getName()); 612 TableDescriptor hdt = builder.build(); 613 try { 614 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 615 616 Put p = new Put(row); 617 p.addColumn(f, row, row); 618 table.put(p); 619 620 // Flush so it can be picked by the replica refresher thread 621 HTU.flush(table.getName()); 622 623 // Sleep for some time until data is picked up by replicas 624 try { 625 Thread.sleep(2 * REFRESH_PERIOD); 626 } catch (InterruptedException e1) { 627 LOG.error(e1.toString(), e1); 628 } 629 630 try { 631 // Create the new connection so new config can kick in 632 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 633 Table t = connection.getTable(hdt.getTableName()); 634 635 // But if we ask for stale we will get it 636 SlowMeCopro.cdl.set(new CountDownLatch(1)); 637 Get g = new Get(row); 638 g.setConsistency(Consistency.TIMELINE); 639 Result r = t.get(g); 640 Assert.assertTrue(r.isStale()); 641 SlowMeCopro.cdl.get().countDown(); 642 } finally { 643 SlowMeCopro.cdl.get().countDown(); 644 SlowMeCopro.sleepTime.set(0); 645 } 646 } finally { 647 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 648 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 649 HTU.getAdmin().disableTable(hdt.getTableName()); 650 HTU.deleteTable(hdt.getTableName()); 651 } 652 } 653}