001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HColumnDescriptor; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 046import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.ObserverContext; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 050import org.apache.hadoop.hbase.coprocessor.RegionObserver; 051import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 052import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 053import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 054import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 061import org.junit.AfterClass; 062import org.junit.Assert; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Category({ LargeTests.class, ClientTests.class }) 073public class TestReplicaWithCluster { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestReplicaWithCluster.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class); 080 081 private static final int NB_SERVERS = 3; 082 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); 083 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 084 085 // second minicluster used in testing of replication 086 private static HBaseTestingUtility HTU2; 087 private static final byte[] f = HConstants.CATALOG_FAMILY; 088 089 private final static int REFRESH_PERIOD = 1000; 090 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; 091 092 @Rule 093 public TestName name = new TestName(); 094 095 /** 096 * This copro is used to synchronize the tests. 097 */ 098 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 099 static final AtomicLong sleepTime = new AtomicLong(0); 100 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); 101 102 public SlowMeCopro() { 103 } 104 105 @Override 106 public Optional<RegionObserver> getRegionObserver() { 107 return Optional.of(this); 108 } 109 110 @Override 111 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 112 final List<Cell> results) throws IOException { 113 114 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 115 CountDownLatch latch = cdl.get(); 116 try { 117 if (sleepTime.get() > 0) { 118 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 119 Thread.sleep(sleepTime.get()); 120 } else if (latch.getCount() > 0) { 121 LOG.info("Waiting for the counterCountDownLatch"); 122 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 123 if (latch.getCount() > 0) { 124 throw new RuntimeException("Can't wait more"); 125 } 126 } 127 } catch (InterruptedException e1) { 128 LOG.error(e1.toString(), e1); 129 } 130 } else { 131 LOG.info("We're not the primary replicas."); 132 } 133 } 134 } 135 136 /** 137 * This copro is used to simulate region server down exception for Get and Scan 138 */ 139 @CoreCoprocessor 140 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { 141 142 public RegionServerStoppedCopro() { 143 } 144 145 @Override 146 public Optional<RegionObserver> getRegionObserver() { 147 return Optional.of(this); 148 } 149 150 @Override 151 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 152 final List<Cell> results) throws IOException { 153 154 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 155 156 // Fail for the primary replica and replica 1 157 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 158 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 159 throw new RegionServerStoppedException( 160 "Server " + e.getEnvironment().getServerName() + " not running"); 161 } else { 162 LOG.info("We're replica region " + replicaId); 163 } 164 } 165 166 @Override 167 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 168 final Scan scan) throws IOException { 169 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 170 // Fail for the primary replica and replica 1 171 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { 172 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); 173 throw new RegionServerStoppedException( 174 "Server " + e.getEnvironment().getServerName() + " not running"); 175 } else { 176 LOG.info("We're replica region " + replicaId); 177 } 178 } 179 } 180 181 /** 182 * This copro is used to slow down the primary meta region scan a bit 183 */ 184 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 185 implements RegionCoprocessor, RegionObserver { 186 static boolean slowDownPrimaryMetaScan = false; 187 static boolean throwException = false; 188 189 @Override 190 public Optional<RegionObserver> getRegionObserver() { 191 return Optional.of(this); 192 } 193 194 @Override 195 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 196 final List<Cell> results) throws IOException { 197 198 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 199 200 // Fail for the primary replica, but not for meta 201 if (throwException) { 202 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 203 LOG.info("Get, throw Region Server Stopped Exceptoin for region " 204 + e.getEnvironment().getRegion().getRegionInfo()); 205 throw new RegionServerStoppedException( 206 "Server " + e.getEnvironment().getServerName() + " not running"); 207 } 208 } else { 209 LOG.info("Get, We're replica region " + replicaId); 210 } 211 } 212 213 @Override 214 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 215 final Scan scan) throws IOException { 216 217 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 218 219 // Slow down with the primary meta region scan 220 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { 221 if (slowDownPrimaryMetaScan) { 222 LOG.info("Scan with primary meta region, slow down a bit"); 223 try { 224 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); 225 } catch (InterruptedException ie) { 226 // Ingore 227 } 228 } 229 230 // Fail for the primary replica 231 if (throwException) { 232 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " 233 + e.getEnvironment().getRegion().getRegionInfo()); 234 235 throw new RegionServerStoppedException( 236 "Server " + e.getEnvironment().getServerName() + " not running"); 237 } else { 238 LOG.info("Scan, We're replica region " + replicaId); 239 } 240 } else { 241 LOG.info("Scan, We're replica region " + replicaId); 242 } 243 } 244 } 245 246 @BeforeClass 247 public static void beforeClass() throws Exception { 248 // enable store file refreshing 249 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 250 REFRESH_PERIOD); 251 252 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); 253 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); 254 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); 255 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); 256 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); 257 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); 258 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); 259 260 // Wait for primary call longer so make sure that it will get exception from the primary call 261 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); 262 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); 263 264 // Make sure master does not host system tables. 265 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 266 267 // Set system coprocessor so it can be applied to meta regions 268 HTU.getConfiguration().set("hbase.coprocessor.region.classes", 269 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); 270 271 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 272 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 273 274 HTU.startMiniCluster(NB_SERVERS); 275 // Enable meta replica at server side 276 HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2); 277 278 HTU.getHBaseCluster().startMaster(); 279 } 280 281 @AfterClass 282 public static void afterClass() throws Exception { 283 if (HTU2 != null) HTU2.shutdownMiniCluster(); 284 HTU.shutdownMiniCluster(); 285 } 286 287 @Test 288 public void testCreateDeleteTable() throws IOException { 289 // Create table then get the single region for our new table. 290 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 291 hdt.setRegionReplication(NB_SERVERS); 292 hdt.addCoprocessor(SlowMeCopro.class.getName()); 293 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 294 295 Put p = new Put(row); 296 p.addColumn(f, row, row); 297 table.put(p); 298 299 Get g = new Get(row); 300 Result r = table.get(g); 301 Assert.assertFalse(r.isStale()); 302 303 try { 304 // But if we ask for stale we will get it 305 SlowMeCopro.cdl.set(new CountDownLatch(1)); 306 g = new Get(row); 307 g.setConsistency(Consistency.TIMELINE); 308 r = table.get(g); 309 Assert.assertTrue(r.isStale()); 310 SlowMeCopro.cdl.get().countDown(); 311 } finally { 312 SlowMeCopro.cdl.get().countDown(); 313 SlowMeCopro.sleepTime.set(0); 314 } 315 316 HTU.getAdmin().disableTable(hdt.getTableName()); 317 HTU.deleteTable(hdt.getTableName()); 318 } 319 320 @Test 321 public void testChangeTable() throws Exception { 322 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 323 .setRegionReplication(NB_SERVERS).setCoprocessor(SlowMeCopro.class.getName()) 324 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build(); 325 HTU.getAdmin().createTable(td); 326 Table table = HTU.getConnection().getTable(td.getTableName()); 327 // basic test: it should work. 328 Put p = new Put(row); 329 p.addColumn(f, row, row); 330 table.put(p); 331 332 Get g = new Get(row); 333 Result r = table.get(g); 334 Assert.assertFalse(r.isStale()); 335 336 // Add a CF, it should work. 337 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 338 td = TableDescriptorBuilder.newBuilder(td) 339 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row)).build(); 340 HTU.getAdmin().disableTable(td.getTableName()); 341 HTU.getAdmin().modifyTable(td); 342 HTU.getAdmin().enableTable(td.getTableName()); 343 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); 344 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 345 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 346 347 p = new Put(row); 348 p.addColumn(row, row, row); 349 table.put(p); 350 351 g = new Get(row); 352 r = table.get(g); 353 Assert.assertFalse(r.isStale()); 354 355 try { 356 SlowMeCopro.cdl.set(new CountDownLatch(1)); 357 g = new Get(row); 358 g.setConsistency(Consistency.TIMELINE); 359 r = table.get(g); 360 Assert.assertTrue(r.isStale()); 361 } finally { 362 SlowMeCopro.cdl.get().countDown(); 363 SlowMeCopro.sleepTime.set(0); 364 } 365 366 Admin admin = HTU.getAdmin(); 367 nHdt = admin.getDescriptor(td.getTableName()); 368 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), 369 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); 370 371 admin.disableTable(td.getTableName()); 372 admin.deleteTable(td.getTableName()); 373 admin.close(); 374 } 375 376 @SuppressWarnings("deprecation") 377 @Test 378 public void testReplicaAndReplication() throws Exception { 379 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 380 hdt.setRegionReplication(NB_SERVERS); 381 382 HColumnDescriptor fam = new HColumnDescriptor(row); 383 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 384 hdt.addFamily(fam); 385 386 hdt.addCoprocessor(SlowMeCopro.class.getName()); 387 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 388 389 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); 390 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 391 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 392 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); 393 394 HTU2 = new HBaseTestingUtility(conf2); 395 HTU2.setZkCluster(miniZK); 396 HTU2.startMiniCluster(NB_SERVERS); 397 LOG.info("Setup second Zk"); 398 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 399 400 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); 401 402 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 403 rpc.setClusterKey(HTU2.getClusterKey()); 404 admin.addPeer("2", rpc, null); 405 admin.close(); 406 407 Put p = new Put(row); 408 p.addColumn(row, row, row); 409 final Table table = HTU.getConnection().getTable(hdt.getTableName()); 410 table.put(p); 411 412 HTU.getAdmin().flush(table.getName()); 413 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 414 415 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 416 @Override 417 public boolean evaluate() throws Exception { 418 try { 419 SlowMeCopro.cdl.set(new CountDownLatch(1)); 420 Get g = new Get(row); 421 g.setConsistency(Consistency.TIMELINE); 422 Result r = table.get(g); 423 Assert.assertTrue(r.isStale()); 424 return !r.isEmpty(); 425 } finally { 426 SlowMeCopro.cdl.get().countDown(); 427 SlowMeCopro.sleepTime.set(0); 428 } 429 } 430 }); 431 table.close(); 432 LOG.info("stale get on the first cluster done. Now for the second."); 433 434 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); 435 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { 436 @Override 437 public boolean evaluate() throws Exception { 438 try { 439 SlowMeCopro.cdl.set(new CountDownLatch(1)); 440 Get g = new Get(row); 441 g.setConsistency(Consistency.TIMELINE); 442 Result r = table2.get(g); 443 Assert.assertTrue(r.isStale()); 444 return !r.isEmpty(); 445 } finally { 446 SlowMeCopro.cdl.get().countDown(); 447 SlowMeCopro.sleepTime.set(0); 448 } 449 } 450 }); 451 table2.close(); 452 453 HTU.getAdmin().disableTable(hdt.getTableName()); 454 HTU.deleteTable(hdt.getTableName()); 455 456 HTU2.getAdmin().disableTable(hdt.getTableName()); 457 HTU2.deleteTable(hdt.getTableName()); 458 459 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down 460 // the minicluster has negative impact of deleting all HConnections in JVM. 461 } 462 463 @Test 464 public void testBulkLoad() throws IOException { 465 // Create table then get the single region for our new table. 466 LOG.debug("Creating test table"); 467 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 468 hdt.setRegionReplication(NB_SERVERS); 469 hdt.addCoprocessor(SlowMeCopro.class.getName()); 470 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 471 472 // create hfiles to load. 473 LOG.debug("Creating test data"); 474 Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName()); 475 final int numRows = 10; 476 final byte[] qual = Bytes.toBytes("qual"); 477 final byte[] val = Bytes.toBytes("val"); 478 final List<Pair<byte[], String>> famPaths = new ArrayList<>(); 479 for (HColumnDescriptor col : hdt.getColumnFamilies()) { 480 Path hfile = new Path(dir, col.getNameAsString()); 481 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, 482 val, numRows); 483 famPaths.add(new Pair<>(col.getName(), hfile.toString())); 484 } 485 486 // bulk load HFiles 487 LOG.debug("Loading test data"); 488 final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); 489 table = conn.getTable(hdt.getTableName()); 490 final String bulkToken = 491 new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); 492 ClientServiceCallable<Void> callable = 493 new ClientServiceCallable<Void>(conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), 494 new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET, 495 Collections.emptyMap()) { 496 @Override 497 protected Void rpcCall() throws Exception { 498 LOG.debug("Going to connect to server " + getLocation() + " for row " 499 + Bytes.toStringBinary(getRow())); 500 SecureBulkLoadClient secureClient = null; 501 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 502 try (Table table = conn.getTable(getTableName())) { 503 secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); 504 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, 505 bulkToken); 506 } 507 return null; 508 } 509 }; 510 RpcRetryingCallerFactory factory = 511 new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration()); 512 RpcRetryingCaller<Void> caller = factory.newCaller(); 513 caller.callWithRetries(callable, 10000); 514 515 // verify we can read them from the primary 516 LOG.debug("Verifying data load"); 517 for (int i = 0; i < numRows; i++) { 518 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 519 Get g = new Get(row); 520 Result r = table.get(g); 521 Assert.assertFalse(r.isStale()); 522 } 523 524 // verify we can read them from the replica 525 LOG.debug("Verifying replica queries"); 526 try { 527 SlowMeCopro.cdl.set(new CountDownLatch(1)); 528 for (int i = 0; i < numRows; i++) { 529 byte[] row = TestHRegionServerBulkLoad.rowkey(i); 530 Get g = new Get(row); 531 g.setConsistency(Consistency.TIMELINE); 532 Result r = table.get(g); 533 Assert.assertTrue(r.isStale()); 534 } 535 SlowMeCopro.cdl.get().countDown(); 536 } finally { 537 SlowMeCopro.cdl.get().countDown(); 538 SlowMeCopro.sleepTime.set(0); 539 } 540 541 HTU.getAdmin().disableTable(hdt.getTableName()); 542 HTU.deleteTable(hdt.getTableName()); 543 } 544 545 @Test 546 public void testReplicaGetWithPrimaryDown() throws IOException { 547 // Create table then get the single region for our new table. 548 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 549 hdt.setRegionReplication(NB_SERVERS); 550 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 551 try { 552 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 553 554 Put p = new Put(row); 555 p.addColumn(f, row, row); 556 table.put(p); 557 558 // Flush so it can be picked by the replica refresher thread 559 HTU.flush(table.getName()); 560 561 // Sleep for some time until data is picked up by replicas 562 try { 563 Thread.sleep(2 * REFRESH_PERIOD); 564 } catch (InterruptedException e1) { 565 LOG.error(e1.toString(), e1); 566 } 567 568 // But if we ask for stale we will get it 569 Get g = new Get(row); 570 g.setConsistency(Consistency.TIMELINE); 571 Result r = table.get(g); 572 Assert.assertTrue(r.isStale()); 573 } finally { 574 HTU.getAdmin().disableTable(hdt.getTableName()); 575 HTU.deleteTable(hdt.getTableName()); 576 } 577 } 578 579 @Test 580 public void testReplicaScanWithPrimaryDown() throws IOException { 581 // Create table then get the single region for our new table. 582 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 583 hdt.setRegionReplication(NB_SERVERS); 584 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName()); 585 586 try { 587 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 588 589 Put p = new Put(row); 590 p.addColumn(f, row, row); 591 table.put(p); 592 593 // Flush so it can be picked by the replica refresher thread 594 HTU.flush(table.getName()); 595 596 // Sleep for some time until data is picked up by replicas 597 try { 598 Thread.sleep(2 * REFRESH_PERIOD); 599 } catch (InterruptedException e1) { 600 LOG.error(e1.toString(), e1); 601 } 602 603 // But if we ask for stale we will get it 604 // Instantiating the Scan class 605 Scan scan = new Scan(); 606 607 // Scanning the required columns 608 scan.addFamily(f); 609 scan.setConsistency(Consistency.TIMELINE); 610 611 // Getting the scan result 612 ResultScanner scanner = table.getScanner(scan); 613 614 Result r = scanner.next(); 615 616 Assert.assertTrue(r.isStale()); 617 } finally { 618 HTU.getAdmin().disableTable(hdt.getTableName()); 619 HTU.deleteTable(hdt.getTableName()); 620 } 621 } 622 623 @Test 624 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException { 625 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); 626 HTU.getConfiguration().set("hbase.rpc.client.impl", 627 "org.apache.hadoop.hbase.ipc.AsyncRpcClient"); 628 // Create table then get the single region for our new table. 629 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 630 hdt.setRegionReplication(NB_SERVERS); 631 hdt.addCoprocessor(SlowMeCopro.class.getName()); 632 633 try { 634 Table table = HTU.createTable(hdt, new byte[][] { f }, null); 635 636 Put p = new Put(row); 637 p.addColumn(f, row, row); 638 table.put(p); 639 640 // Flush so it can be picked by the replica refresher thread 641 HTU.flush(table.getName()); 642 643 // Sleep for some time until data is picked up by replicas 644 try { 645 Thread.sleep(2 * REFRESH_PERIOD); 646 } catch (InterruptedException e1) { 647 LOG.error(e1.toString(), e1); 648 } 649 650 try { 651 // Create the new connection so new config can kick in 652 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 653 Table t = connection.getTable(hdt.getTableName()); 654 655 // But if we ask for stale we will get it 656 SlowMeCopro.cdl.set(new CountDownLatch(1)); 657 Get g = new Get(row); 658 g.setConsistency(Consistency.TIMELINE); 659 Result r = t.get(g); 660 Assert.assertTrue(r.isStale()); 661 SlowMeCopro.cdl.get().countDown(); 662 } finally { 663 SlowMeCopro.cdl.get().countDown(); 664 SlowMeCopro.sleepTime.set(0); 665 } 666 } finally { 667 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); 668 HTU.getConfiguration().unset("hbase.rpc.client.impl"); 669 HTU.getAdmin().disableTable(hdt.getTableName()); 670 HTU.deleteTable(hdt.getTableName()); 671 } 672 } 673 674 // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table 675 // scan will always get the result from primary meta region as long as the result is returned 676 // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region. 677 @Test 678 public void testGetRegionLocationFromPrimaryMetaRegion() 679 throws IOException, InterruptedException { 680 HTU.getAdmin().setBalancerRunning(false, true); 681 Configuration conf = new Configuration(HTU.getConfiguration()); 682 conf.setBoolean(USE_META_REPLICAS, true); 683 Connection conn = ConnectionFactory.createConnection(conf); 684 685 // Create table then get the single region for our new table. 686 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 687 hdt.setRegionReplication(2); 688 try { 689 690 HTU.createTable(hdt, new byte[][] { f }, null); 691 692 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; 693 694 // Get user table location, always get it from the primary meta replica 695 RegionLocations url = 696 ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false); 697 698 } finally { 699 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; 700 HTU.getAdmin().setBalancerRunning(true, true); 701 HTU.getAdmin().disableTable(hdt.getTableName()); 702 HTU.deleteTable(hdt.getTableName()); 703 } 704 } 705 706 // This test is to simulate the case that the meta region and the primary user region 707 // are down, hbase client is able to access user replica regions and return stale data. 708 // Meta replica is enabled to show the case that the meta replica region could be out of sync 709 // with the primary meta region. 710 @Test 711 public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { 712 HTU.getAdmin().setBalancerRunning(false, true); 713 714 Configuration conf = new Configuration(HTU.getConfiguration()); 715 conf.setBoolean(USE_META_REPLICAS, true); 716 Connection conn = ConnectionFactory.createConnection(conf); 717 718 // Create table then get the single region for our new table. 719 HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName()); 720 hdt.setRegionReplication(2); 721 try { 722 723 HTU.createTable(hdt, new byte[][] { f }, null); 724 Table table = conn.getTable(TableName.valueOf(name.getMethodName())); 725 726 // Get Meta location 727 RegionLocations mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME, 728 HConstants.EMPTY_START_ROW, false, false); 729 730 // Get user table location 731 RegionLocations url = 732 ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, false); 733 734 // Make sure that user primary region is co-hosted with the meta region 735 if ( 736 !url.getDefaultRegionLocation().getServerName() 737 .equals(mrl.getDefaultRegionLocation().getServerName()) 738 ) { 739 HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), 740 mrl.getDefaultRegionLocation().getServerName()); 741 } 742 743 // Make sure that the user replica region is not hosted by the same region server with 744 // primary 745 if ( 746 url.getRegionLocation(1).getServerName() 747 .equals(mrl.getDefaultRegionLocation().getServerName()) 748 ) { 749 HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), 750 url.getDefaultRegionLocation().getServerName()); 751 } 752 753 // Wait until the meta table is updated with new location info 754 while (true) { 755 mrl = ((ClusterConnection) conn).locateRegion(TableName.META_TABLE_NAME, 756 HConstants.EMPTY_START_ROW, false, false); 757 758 // Get user table location 759 url = ((ClusterConnection) conn).locateRegion(hdt.getTableName(), row, false, true); 760 761 LOG.info("meta locations " + mrl); 762 LOG.info("table locations " + url); 763 ServerName a = url.getDefaultRegionLocation().getServerName(); 764 ServerName b = mrl.getDefaultRegionLocation().getServerName(); 765 if (a.equals(b)) { 766 break; 767 } else { 768 LOG.info("Waiting for new region info to be updated in meta table"); 769 Thread.sleep(100); 770 } 771 } 772 773 Put p = new Put(row); 774 p.addColumn(f, row, row); 775 table.put(p); 776 777 // Flush so it can be picked by the replica refresher thread 778 HTU.flush(table.getName()); 779 780 // Sleep for some time until data is picked up by replicas 781 try { 782 Thread.sleep(2 * REFRESH_PERIOD); 783 } catch (InterruptedException e1) { 784 LOG.error(e1.toString(), e1); 785 } 786 787 // Simulating the RS down 788 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; 789 790 // The first Get is supposed to succeed 791 Get g = new Get(row); 792 g.setConsistency(Consistency.TIMELINE); 793 Result r = table.get(g); 794 Assert.assertTrue(r.isStale()); 795 796 // The second Get will succeed as well 797 r = table.get(g); 798 Assert.assertTrue(r.isStale()); 799 } finally { 800 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; 801 HTU.getAdmin().setBalancerRunning(true, true); 802 HTU.getAdmin().disableTable(hdt.getTableName()); 803 HTU.deleteTable(hdt.getTableName()); 804 } 805 } 806}