001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TestMetaTableAccessor; 039import org.apache.hadoop.hbase.client.Consistency; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionLocator; 044import org.apache.hadoop.hbase.client.RegionReplicaUtil; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 052import org.apache.hadoop.hbase.util.Threads; 053import org.apache.hadoop.hdfs.DFSConfigKeys; 054import org.apache.hadoop.util.StringUtils; 055import org.junit.AfterClass; 056import org.junit.Assert; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 069 070/** 071 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 072 * See {@link TestRegionServerNoMaster}. 073 */ 074@Category({ RegionServerTests.class, LargeTests.class }) 075public class TestRegionReplicas { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestRegionReplicas.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class); 082 083 private static final int NB_SERVERS = 1; 084 private static Table table; 085 private static final byte[] row = Bytes.toBytes("TestRegionReplicas"); 086 087 private static RegionInfo hriPrimary; 088 private static RegionInfo hriSecondary; 089 090 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 091 private static final byte[] f = HConstants.CATALOG_FAMILY; 092 093 @BeforeClass 094 public static void before() throws Exception { 095 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 096 // when the file is moved to archive (e.g. compaction) 097 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 098 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 099 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 100 101 HTU.startMiniCluster(NB_SERVERS); 102 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 103 104 // Create table then get the single region for our new table. 105 table = HTU.createTable(tableName, f); 106 107 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 108 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 109 } 110 111 // mock a secondary region info to open 112 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 113 114 // No master 115 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 116 } 117 118 @AfterClass 119 public static void afterClass() throws Exception { 120 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 121 table.close(); 122 HTU.shutdownMiniCluster(); 123 } 124 125 private HRegionServer getRS() { 126 return HTU.getMiniHBaseCluster().getRegionServer(0); 127 } 128 129 @Test 130 public void testOpenRegionReplica() throws Exception { 131 openRegion(HTU, getRS(), hriSecondary); 132 try { 133 // load some data to primary 134 HTU.loadNumericRows(table, f, 0, 1000); 135 136 // assert that we can read back from primary 137 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table)); 138 } finally { 139 HTU.deleteNumericRows(table, f, 0, 1000); 140 closeRegion(HTU, getRS(), hriSecondary); 141 } 142 } 143 144 /** Tests that the meta location is saved for secondary regions */ 145 @Test 146 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 147 openRegion(HTU, getRS(), hriSecondary); 148 Table meta = null; 149 try { 150 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 151 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(), 152 getRS().getServerName(), -1, 1, false); 153 } finally { 154 if (meta != null) { 155 meta.close(); 156 } 157 closeRegion(HTU, getRS(), hriSecondary); 158 } 159 } 160 161 @Test 162 public void testRegionReplicaGets() throws Exception { 163 try { 164 // load some data to primary 165 HTU.loadNumericRows(table, f, 0, 1000); 166 // assert that we can read back from primary 167 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table)); 168 // flush so that region replica can read 169 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 170 region.flush(true); 171 172 openRegion(HTU, getRS(), hriSecondary); 173 174 // first try directly against region 175 region = getRS().getRegion(hriSecondary.getEncodedName()); 176 assertGet(region, 42, true); 177 178 assertGetRpc(hriSecondary, 42, true); 179 } finally { 180 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 181 closeRegion(HTU, getRS(), hriSecondary); 182 } 183 } 184 185 @Test 186 public void testGetOnTargetRegionReplica() throws Exception { 187 try { 188 // load some data to primary 189 HTU.loadNumericRows(table, f, 0, 1000); 190 // assert that we can read back from primary 191 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table)); 192 // flush so that region replica can read 193 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 194 region.flush(true); 195 196 openRegion(HTU, getRS(), hriSecondary); 197 198 // try directly Get against region replica 199 byte[] row = Bytes.toBytes(String.valueOf(42)); 200 Get get = new Get(row); 201 get.setConsistency(Consistency.TIMELINE); 202 get.setReplicaId(1); 203 Result result = table.get(get); 204 Assert.assertArrayEquals(row, result.getValue(f, null)); 205 } finally { 206 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 207 closeRegion(HTU, getRS(), hriSecondary); 208 } 209 } 210 211 private void assertGet(Region region, int value, boolean expect) throws IOException { 212 byte[] row = Bytes.toBytes(String.valueOf(value)); 213 Get get = new Get(row); 214 Result result = region.get(get); 215 if (expect) { 216 Assert.assertArrayEquals(row, result.getValue(f, null)); 217 } else { 218 result.isEmpty(); 219 } 220 } 221 222 // build a mock rpc 223 private void assertGetRpc(RegionInfo info, int value, boolean expect) 224 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 225 byte[] row = Bytes.toBytes(String.valueOf(value)); 226 Get get = new Get(row); 227 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 228 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 229 Result result = ProtobufUtil.toResult(getResp.getResult()); 230 if (expect) { 231 Assert.assertArrayEquals(row, result.getValue(f, null)); 232 } else { 233 result.isEmpty(); 234 } 235 } 236 237 private void restartRegionServer() throws Exception { 238 afterClass(); 239 before(); 240 } 241 242 @Test 243 public void testRefresStoreFiles() throws Exception { 244 // enable store file refreshing 245 final int refreshPeriod = 2000; // 2 sec 246 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 247 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 248 refreshPeriod); 249 // restart the region server so that it starts the refresher chore 250 restartRegionServer(); 251 252 try { 253 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 254 openRegion(HTU, getRS(), hriSecondary); 255 256 // load some data to primary 257 LOG.info("Loading data to primary region"); 258 HTU.loadNumericRows(table, f, 0, 1000); 259 // assert that we can read back from primary 260 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table)); 261 // flush so that region replica can read 262 LOG.info("Flushing primary region"); 263 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 264 region.flush(true); 265 266 // ensure that chore is run 267 LOG.info("Sleeping for " + (4 * refreshPeriod)); 268 Threads.sleep(4 * refreshPeriod); 269 270 LOG.info("Checking results from secondary region replica"); 271 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 272 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 273 274 assertGet(secondaryRegion, 42, true); 275 assertGetRpc(hriSecondary, 42, true); 276 assertGetRpc(hriSecondary, 1042, false); 277 278 // load some data to primary 279 HTU.loadNumericRows(table, f, 1000, 1100); 280 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 281 region.flush(true); 282 283 HTU.loadNumericRows(table, f, 2000, 2100); 284 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 285 region.flush(true); 286 287 // ensure that chore is run 288 Threads.sleep(4 * refreshPeriod); 289 290 assertGetRpc(hriSecondary, 42, true); 291 assertGetRpc(hriSecondary, 1042, true); 292 assertGetRpc(hriSecondary, 2042, true); 293 294 // ensure that we see the 3 store files 295 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 296 297 // force compaction 298 HTU.compact(table.getName(), true); 299 300 long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod; 301 while (EnvironmentEdgeManager.currentTime() < wakeUpTime) { 302 assertGetRpc(hriSecondary, 42, true); 303 assertGetRpc(hriSecondary, 1042, true); 304 assertGetRpc(hriSecondary, 2042, true); 305 Threads.sleep(10); 306 } 307 308 // ensure that we see the compacted file only 309 // This will be 4 until the cleaner chore runs 310 Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); 311 312 } finally { 313 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 314 closeRegion(HTU, getRS(), hriSecondary); 315 } 316 } 317 318 @Test 319 public void testFlushAndCompactionsInPrimary() throws Exception { 320 321 long runtime = 30 * 1000; 322 // enable store file refreshing 323 final int refreshPeriod = 100; // 100ms refresh is a lot 324 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 325 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 326 refreshPeriod); 327 // restart the region server so that it starts the refresher chore 328 restartRegionServer(); 329 final int startKey = 0, endKey = 1000; 330 331 try { 332 openRegion(HTU, getRS(), hriSecondary); 333 334 // load some data to primary so that reader won't fail 335 HTU.loadNumericRows(table, f, startKey, endKey); 336 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 337 // ensure that chore is run 338 Threads.sleep(2 * refreshPeriod); 339 340 final AtomicBoolean running = new AtomicBoolean(true); 341 @SuppressWarnings("unchecked") 342 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 343 for (int i = 0; i < exceptions.length; i++) { 344 exceptions[i] = new AtomicReference<>(); 345 } 346 347 Runnable writer = new Runnable() { 348 int key = startKey; 349 350 @Override 351 public void run() { 352 try { 353 while (running.get()) { 354 byte[] data = Bytes.toBytes(String.valueOf(key)); 355 Put put = new Put(data); 356 put.addColumn(f, null, data); 357 table.put(put); 358 key++; 359 if (key == endKey) { 360 key = startKey; 361 } 362 } 363 } catch (Exception ex) { 364 LOG.warn(ex.toString(), ex); 365 exceptions[0].compareAndSet(null, ex); 366 } 367 } 368 }; 369 370 Runnable flusherCompactor = new Runnable() { 371 Random random = ThreadLocalRandom.current(); 372 373 public void run() { 374 try { 375 while (running.get()) { 376 // flush or compact 377 if (random.nextBoolean()) { 378 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 379 } else { 380 HTU.compact(table.getName(), random.nextBoolean()); 381 } 382 } 383 } catch (Exception ex) { 384 LOG.warn(ex.toString(), ex); 385 exceptions[1].compareAndSet(null, ex); 386 } 387 } 388 }; 389 390 Runnable reader = new Runnable() { 391 @Override 392 public void run() { 393 try { 394 Random random = ThreadLocalRandom.current(); 395 while (running.get()) { 396 // whether to do a close and open 397 if (random.nextInt(10) == 0) { 398 try { 399 closeRegion(HTU, getRS(), hriSecondary); 400 } catch (Exception ex) { 401 LOG.warn("Failed closing the region " + hriSecondary + " " 402 + StringUtils.stringifyException(ex)); 403 exceptions[2].compareAndSet(null, ex); 404 } 405 try { 406 openRegion(HTU, getRS(), hriSecondary); 407 } catch (Exception ex) { 408 LOG.warn("Failed opening the region " + hriSecondary + " " 409 + StringUtils.stringifyException(ex)); 410 exceptions[2].compareAndSet(null, ex); 411 } 412 } 413 414 int key = random.nextInt(endKey - startKey) + startKey; 415 assertGetRpc(hriSecondary, key, true); 416 } 417 } catch (Exception ex) { 418 LOG.warn("Failed getting the value in the region " + hriSecondary + " " 419 + StringUtils.stringifyException(ex)); 420 exceptions[2].compareAndSet(null, ex); 421 } 422 } 423 }; 424 425 LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName()); 426 ExecutorService executor = Executors.newFixedThreadPool(3); 427 executor.submit(writer); 428 executor.submit(flusherCompactor); 429 executor.submit(reader); 430 431 // wait for threads 432 Threads.sleep(runtime); 433 running.set(false); 434 executor.shutdown(); 435 executor.awaitTermination(30, TimeUnit.SECONDS); 436 437 for (AtomicReference<Exception> exRef : exceptions) { 438 Assert.assertNull(exRef.get()); 439 } 440 } finally { 441 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 442 try { 443 closeRegion(HTU, getRS(), hriSecondary); 444 } catch (ServiceException e) { 445 LOG.info("Closing wrong region {}", hriSecondary, e); 446 } 447 } 448 } 449 450 @Test 451 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 452 // disable the store file refresh chore (we do this by hand) 453 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 454 restartRegionServer(); 455 456 try { 457 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 458 openRegion(HTU, getRS(), hriSecondary); 459 460 // load some data to primary 461 LOG.info("Loading data to primary region"); 462 for (int i = 0; i < 3; ++i) { 463 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 464 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 465 region.flush(true); 466 } 467 468 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName()); 469 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 470 471 // Refresh store files on the secondary 472 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 473 secondaryRegion.getStore(f).refreshStoreFiles(); 474 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 475 476 // force compaction 477 LOG.info("Force Major compaction on primary region " + hriPrimary); 478 primaryRegion.compact(true); 479 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 480 List<RegionServerThread> regionServerThreads = 481 HTU.getMiniHBaseCluster().getRegionServerThreads(); 482 HRegionServer hrs = null; 483 for (RegionServerThread rs : regionServerThreads) { 484 if ( 485 rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) 486 != null 487 ) { 488 hrs = rs.getRegionServer(); 489 break; 490 } 491 } 492 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false); 493 cleaner.chore(); 494 // scan all the hfiles on the secondary. 495 // since there are no read on the secondary when we ask locations to 496 // the NN a FileNotFound exception will be returned and the FileLink 497 // should be able to deal with it giving us all the result we expect. 498 int keys = 0; 499 int sum = 0; 500 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { 501 // Our file does not exist anymore. was moved by the compaction above. 502 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath()))); 503 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); 504 sf.initReader(); 505 try (StoreFileScanner scanner = sf.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 506 scanner.seek(KeyValue.LOWESTKEY); 507 for (Cell cell;;) { 508 cell = scanner.next(); 509 if (cell == null) { 510 break; 511 } 512 keys++; 513 sum += Integer.parseInt( 514 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 515 } 516 } 517 } 518 Assert.assertEquals(3000, keys); 519 Assert.assertEquals(4498500, sum); 520 } finally { 521 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 522 closeRegion(HTU, getRS(), hriSecondary); 523 } 524 } 525}