001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicInteger; 026import java.util.concurrent.atomic.AtomicReference; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.Waiter.Predicate; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Consistency; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 045import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 046import org.junit.After; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Tests failover of secondary region replicas. 058 */ 059@Category(LargeTests.class) 060public class TestRegionReplicaFailover { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestRegionReplicaFailover.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class); 067 068 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 069 070 private static final int NB_SERVERS = 3; 071 072 protected final byte[][] families = 073 new byte[][] { HBaseTestingUtil.fam1, HBaseTestingUtil.fam2, HBaseTestingUtil.fam3 }; 074 protected final byte[] fam = HBaseTestingUtil.fam1; 075 protected final byte[] qual1 = Bytes.toBytes("qual1"); 076 protected final byte[] value1 = Bytes.toBytes("value1"); 077 protected final byte[] row = Bytes.toBytes("rowA"); 078 protected final byte[] row2 = Bytes.toBytes("rowB"); 079 080 @Rule 081 public TestName name = new TestName(); 082 083 private TableDescriptor htd; 084 085 @Before 086 public void before() throws Exception { 087 Configuration conf = HTU.getConfiguration(); 088 // Up the handlers; this test needs more than usual. 089 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 090 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 091 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); 092 conf.setInt("replication.stats.thread.period.seconds", 5); 093 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 094 095 HTU.startMiniCluster(NB_SERVERS); 096 htd = HTU.createModifyableTableDescriptor( 097 TableName.valueOf(name.getMethodName().substring(0, name.getMethodName().length() - 3)), 098 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 099 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(3).build(); 100 HTU.getAdmin().createTable(htd); 101 } 102 103 @After 104 public void after() throws Exception { 105 HTU.deleteTableIfAny(htd.getTableName()); 106 HTU.shutdownMiniCluster(); 107 } 108 109 /** 110 * Tests the case where a newly created table with region replicas and no data, the secondary 111 * region replicas are available to read immediately. 112 */ 113 @Test 114 public void testSecondaryRegionWithEmptyRegion() throws IOException { 115 // Create a new table with region replication, don't put any data. Test that the secondary 116 // region replica is available to read. 117 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 118 Table table = connection.getTable(htd.getTableName())) { 119 120 Get get = new Get(row); 121 get.setConsistency(Consistency.TIMELINE); 122 get.setReplicaId(1); 123 table.get(get); // this should not block 124 } 125 } 126 127 /** 128 * Tests the case where if there is some data in the primary region, reopening the region replicas 129 * (enable/disable table, etc) makes the region replicas readable. 130 */ 131 @Test 132 public void testSecondaryRegionWithNonEmptyRegion() throws IOException { 133 // Create a new table with region replication and load some data 134 // than disable and enable the table again and verify the data from secondary 135 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 136 Table table = connection.getTable(htd.getTableName())) { 137 138 HTU.loadNumericRows(table, fam, 0, 1000); 139 140 HTU.getAdmin().disableTable(htd.getTableName()); 141 HTU.getAdmin().enableTable(htd.getTableName()); 142 143 HTU.verifyNumericRows(table, fam, 0, 1000, 1); 144 } 145 } 146 147 /** 148 * Tests the case where killing a primary region with unflushed data recovers 149 */ 150 @Test 151 public void testPrimaryRegionKill() throws Exception { 152 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 153 Table table = connection.getTable(htd.getTableName())) { 154 155 HTU.loadNumericRows(table, fam, 0, 1000); 156 157 // wal replication is async, we have to wait until the replication catches up, or we timeout 158 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 159 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 160 161 // we should not have flushed files now, but data in memstores of primary and secondary 162 // kill the primary region replica now, and ensure that when it comes back up, we can still 163 // read from it the same data from primary and secondaries 164 boolean aborted = false; 165 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 166 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 167 if (r.getRegionInfo().getReplicaId() == 0) { 168 LOG.info("Aborting region server hosting primary region replica"); 169 rs.getRegionServer().abort("for test"); 170 aborted = true; 171 break; 172 } 173 } 174 } 175 assertTrue(aborted); 176 177 // wal replication is async, we have to wait until the replication catches up, or we timeout 178 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000); 179 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 180 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 181 } 182 183 // restart the region server 184 HTU.getMiniHBaseCluster().startRegionServer(); 185 } 186 187 /** 188 * wal replication is async, we have to wait until the replication catches up, or we timeout 189 */ 190 private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, 191 final int endRow, final int replicaId, final long timeout) throws Exception { 192 try { 193 HTU.waitFor(timeout, new Predicate<Exception>() { 194 @Override 195 public boolean evaluate() throws Exception { 196 try { 197 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 198 return true; 199 } catch (AssertionError ae) { 200 return false; 201 } 202 } 203 }); 204 } catch (Throwable t) { 205 // ignore this, but redo the verify do get the actual exception 206 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 207 } 208 } 209 210 /** 211 * Tests the case where killing a secondary region with unflushed data recovers, and the replica 212 * becomes available to read again shortly. 213 */ 214 @Test 215 public void testSecondaryRegionKill() throws Exception { 216 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 217 Table table = connection.getTable(htd.getTableName())) { 218 HTU.loadNumericRows(table, fam, 0, 1000); 219 220 // wait for some time to ensure that async wal replication does it's magic 221 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 222 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 223 224 // we should not have flushed files now, but data in memstores of primary and secondary 225 // kill the secondary region replica now, and ensure that when it comes back up, we can still 226 // read from it the same data 227 boolean aborted = false; 228 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 229 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 230 if (r.getRegionInfo().getReplicaId() == 1) { 231 LOG.info("Aborting region server hosting secondary region replica"); 232 rs.getRegionServer().abort("for test"); 233 aborted = true; 234 break; 235 } 236 } 237 } 238 assertTrue(aborted); 239 240 // It takes extra time for replica region is ready for read as during 241 // region open process, it needs to ask primary region to do a flush and replica region 242 // can open newly flushed hfiles to avoid data out-of-sync. 243 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 244 HTU.verifyNumericRows(table, fam, 0, 1000, 2); 245 } 246 247 // restart the region server 248 HTU.getMiniHBaseCluster().startRegionServer(); 249 } 250 251 /** 252 * Tests the case where there are 3 region replicas and the primary is continuously accepting new 253 * writes while one of the secondaries is killed. Verification is done for both of the secondary 254 * replicas. 255 */ 256 @Test 257 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception { 258 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 259 Table table = connection.getTable(htd.getTableName()); Admin admin = connection.getAdmin()) { 260 // start a thread to do the loading of primary 261 HTU.loadNumericRows(table, fam, 0, 1000); // start with some base 262 admin.flush(table.getName()); 263 HTU.loadNumericRows(table, fam, 1000, 2000); 264 265 final AtomicReference<Throwable> ex = new AtomicReference<>(null); 266 final AtomicBoolean done = new AtomicBoolean(false); 267 final AtomicInteger key = new AtomicInteger(2000); 268 269 Thread loader = new Thread() { 270 @Override 271 public void run() { 272 while (!done.get()) { 273 try { 274 HTU.loadNumericRows(table, fam, key.get(), key.get() + 1000); 275 key.addAndGet(1000); 276 } catch (Throwable e) { 277 ex.compareAndSet(null, e); 278 } 279 } 280 } 281 }; 282 loader.start(); 283 284 Thread aborter = new Thread() { 285 @Override 286 public void run() { 287 try { 288 boolean aborted = false; 289 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 290 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 291 if (r.getRegionInfo().getReplicaId() == 1) { 292 LOG.info("Aborting region server hosting secondary region replica"); 293 rs.getRegionServer().abort("for test"); 294 aborted = true; 295 } 296 } 297 } 298 assertTrue(aborted); 299 } catch (Throwable e) { 300 ex.compareAndSet(null, e); 301 } 302 } 303 }; 304 305 aborter.start(); 306 aborter.join(); 307 done.set(true); 308 loader.join(); 309 310 assertNull(ex.get()); 311 312 assertTrue(key.get() > 1000); // assert that the test is working as designed 313 LOG.info("Loaded up to key :" + key.get()); 314 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000); 315 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000); 316 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000); 317 } 318 319 // restart the region server 320 HTU.getMiniHBaseCluster().startRegionServer(); 321 } 322 323 /** 324 * Tests the case where we are creating a table with a lot of regions and replicas. Opening region 325 * replicas should not block handlers on RS indefinitely. 326 */ 327 @Test 328 public void testLotsOfRegionReplicas() throws IOException { 329 int numRegions = NB_SERVERS * 20; 330 int regionReplication = 10; 331 String tableName = htd.getTableName().getNameAsString() + "2"; 332 htd = HTU 333 .createModifyableTableDescriptor(TableName.valueOf(tableName), 334 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 335 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED) 336 .setRegionReplication(regionReplication).build(); 337 338 // dont care about splits themselves too much 339 byte[] startKey = Bytes.toBytes("aaa"); 340 byte[] endKey = Bytes.toBytes("zzz"); 341 byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions); 342 HTU.getAdmin().createTable(htd, startKey, endKey, numRegions); 343 344 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 345 Table table = connection.getTable(htd.getTableName())) { 346 347 for (int i = 1; i < splits.length; i++) { 348 for (int j = 0; j < regionReplication; j++) { 349 Get get = new Get(splits[i]); 350 get.setConsistency(Consistency.TIMELINE); 351 get.setReplicaId(j); 352 table.get(get); // this should not block. Regions should be coming online 353 } 354 } 355 } 356 357 HTU.deleteTableIfAny(TableName.valueOf(tableName)); 358 } 359}