001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellBuilderType; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseTestingUtil; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Durability; 057import org.apache.hadoop.hbase.client.Get; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 063import org.apache.hadoop.hbase.executor.ExecutorService; 064import org.apache.hadoop.hbase.executor.ExecutorType; 065import org.apache.hadoop.hbase.io.hfile.HFile; 066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 070import org.apache.hadoop.hbase.testclassification.LargeTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 074import org.apache.hadoop.hbase.util.FSUtils; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 077import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 078import org.apache.hadoop.hbase.wal.WAL; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALFactory; 081import org.apache.hadoop.hbase.wal.WALKeyImpl; 082import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 083import org.apache.hadoop.hbase.wal.WALStreamReader; 084import org.apache.hadoop.util.StringUtils; 085import org.junit.After; 086import org.junit.AfterClass; 087import org.junit.Before; 088import org.junit.BeforeClass; 089import org.junit.ClassRule; 090import org.junit.Rule; 091import org.junit.Test; 092import org.junit.experimental.categories.Category; 093import org.junit.rules.TestName; 094import org.mockito.Mockito; 095import org.slf4j.Logger; 096import org.slf4j.LoggerFactory; 097 098import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 099import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 100 101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 111 112/** 113 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 114 * region replicas 115 */ 116@SuppressWarnings("deprecation") 117@Category(LargeTests.class) 118public class TestHRegionReplayEvents { 119 120 @ClassRule 121 public static final HBaseClassTestRule CLASS_RULE = 122 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 123 124 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 125 @Rule 126 public TestName name = new TestName(); 127 128 private static HBaseTestingUtil TEST_UTIL; 129 130 public static Configuration CONF; 131 private String dir; 132 133 private byte[][] families = 134 new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") }; 135 136 // Test names 137 protected byte[] tableName; 138 protected String method; 139 protected final byte[] row = Bytes.toBytes("rowA"); 140 protected final byte[] row2 = Bytes.toBytes("rowB"); 141 protected byte[] cq = Bytes.toBytes("cq"); 142 143 // per test fields 144 private Path rootDir; 145 private TableDescriptor htd; 146 private RegionServerServices rss; 147 private RegionInfo primaryHri, secondaryHri; 148 private HRegion primaryRegion, secondaryRegion; 149 private WAL walPrimary, walSecondary; 150 private WALStreamReader reader; 151 152 @BeforeClass 153 public static void setUpBeforeClass() throws Exception { 154 TEST_UTIL = new HBaseTestingUtil(); 155 TEST_UTIL.startMiniDFSCluster(1); 156 } 157 158 @AfterClass 159 public static void tearDownAfterClass() throws Exception { 160 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 161 TEST_UTIL.cleanupTestDir(); 162 TEST_UTIL.shutdownMiniDFSCluster(); 163 } 164 165 @Before 166 public void setUp() throws Exception { 167 CONF = TEST_UTIL.getConfiguration(); 168 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 169 method = name.getMethodName(); 170 tableName = Bytes.toBytes(name.getMethodName()); 171 rootDir = new Path(dir + method); 172 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 173 method = name.getMethodName(); 174 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 175 for (byte[] family : families) { 176 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 177 } 178 htd = builder.build(); 179 180 long time = EnvironmentEdgeManager.currentTime(); 181 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 182 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 183 primaryHri = 184 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 185 secondaryHri = 186 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 187 188 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 189 walPrimary = wals.getWAL(primaryHri); 190 walSecondary = wals.getWAL(secondaryHri); 191 192 rss = mock(RegionServerServices.class); 193 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 194 when(rss.getConfiguration()).thenReturn(CONF); 195 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 196 String string = 197 org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); 198 ExecutorService es = new ExecutorService(string); 199 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1) 200 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 201 when(rss.getExecutorService()).thenReturn(es); 202 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 203 primaryRegion.close(); 204 List<HRegion> regions = new ArrayList<>(); 205 regions.add(primaryRegion); 206 Mockito.doReturn(regions).when(rss).getRegions(); 207 208 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 209 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 210 211 reader = null; 212 } 213 214 @After 215 public void tearDown() throws Exception { 216 if (reader != null) { 217 reader.close(); 218 } 219 220 if (primaryRegion != null) { 221 HBaseTestingUtil.closeRegionAndWAL(primaryRegion); 222 } 223 if (secondaryRegion != null) { 224 HBaseTestingUtil.closeRegionAndWAL(secondaryRegion); 225 } 226 227 EnvironmentEdgeManagerTestHelper.reset(); 228 } 229 230 String getName() { 231 return name.getMethodName(); 232 } 233 234 // Some of the test cases are as follows: 235 // 1. replay flush start marker again 236 // 2. replay flush with smaller seqId than what is there in memstore snapshot 237 // 3. replay flush with larger seqId than what is there in memstore snapshot 238 // 4. replay flush commit without flush prepare. non droppable memstore 239 // 5. replay flush commit without flush prepare. droppable memstore 240 // 6. replay open region event 241 // 7. replay open region event after flush start 242 // 8. replay flush form an earlier seqId (test ignoring seqIds) 243 // 9. start flush does not prevent region from closing. 244 245 @Test 246 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 247 // load some data and flush ensure that the secondary replica will not execute the flush 248 249 // load some data to secondary by replaying 250 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 251 252 verifyData(secondaryRegion, 0, 1000, cq, families); 253 254 // flush region 255 FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true); 256 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 257 258 verifyData(secondaryRegion, 0, 1000, cq, families); 259 260 // close the region, and inspect that it has not flushed 261 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 262 // assert that there are no files (due to flush) 263 for (List<HStoreFile> f : files.values()) { 264 assertTrue(f.isEmpty()); 265 } 266 } 267 268 /** 269 * Tests a case where we replay only a flush start marker, then the region is closed. This region 270 * should not block indefinitely 271 */ 272 @Test 273 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 274 // load some data to primary and flush 275 int start = 0; 276 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 277 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 278 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 279 primaryRegion.flush(true); 280 281 // now replay the edits and the flush marker 282 reader = createWALReaderForPrimary(); 283 284 LOG.info("-- Replaying edits and flush events in secondary"); 285 while (true) { 286 WAL.Entry entry = reader.next(); 287 if (entry == null) { 288 break; 289 } 290 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 291 if (flushDesc != null) { 292 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 293 LOG.info("-- Replaying flush start in secondary"); 294 secondaryRegion.replayWALFlushStartMarker(flushDesc); 295 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 296 LOG.info("-- NOT Replaying flush commit in secondary"); 297 } 298 } else { 299 replayEdit(secondaryRegion, entry); 300 } 301 } 302 303 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 304 // now close the region which should not cause hold because of un-committed flush 305 secondaryRegion.close(); 306 307 // verify that the memstore size is back to what it was 308 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 309 } 310 311 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 312 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 313 return 0; // handled elsewhere 314 } 315 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 316 for (Cell cell : entry.getEdit().getCells()) 317 put.add(cell); 318 put.setDurability(Durability.SKIP_WAL); 319 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 320 region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId()); 321 return Integer.parseInt(Bytes.toString(put.getRow())); 322 } 323 324 private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException { 325 return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(), 326 AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); 327 } 328 329 @Test 330 public void testBatchReplayWithMultipleNonces() throws IOException { 331 try { 332 MutationReplay[] mutations = new MutationReplay[100]; 333 for (int i = 0; i < 100; i++) { 334 Put put = new Put(Bytes.toBytes(i)); 335 put.setDurability(Durability.SYNC_WAL); 336 for (byte[] familly : this.families) { 337 put.addColumn(familly, this.cq, null); 338 long nonceNum = i / 10; 339 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 340 } 341 } 342 primaryRegion.batchReplay(mutations, 20); 343 } catch (Exception e) { 344 String msg = "Error while replay of batch with multiple nonces. "; 345 LOG.error(msg, e); 346 fail(msg + e.getMessage()); 347 } 348 } 349 350 @Test 351 public void testReplayFlushesAndCompactions() throws IOException { 352 // initiate a secondary region with some data. 353 354 // load some data to primary and flush. 3 flushes and some more unflushed data 355 putDataWithFlushes(primaryRegion, 100, 300, 100); 356 357 // compaction from primary 358 LOG.info("-- Compacting primary, only 1 store"); 359 primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE); 360 361 // now replay the edits and the flush marker 362 reader = createWALReaderForPrimary(); 363 364 LOG.info("-- Replaying edits and flush events in secondary"); 365 int lastReplayed = 0; 366 int expectedStoreFileCount = 0; 367 while (true) { 368 WAL.Entry entry = reader.next(); 369 if (entry == null) { 370 break; 371 } 372 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 373 CompactionDescriptor compactionDesc = 374 WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 375 if (flushDesc != null) { 376 // first verify that everything is replayed and visible before flush event replay 377 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 378 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 379 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 380 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 381 MemStoreSize mss = store.getFlushableSize(); 382 long storeSize = store.getSize(); 383 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 384 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 385 LOG.info("-- Replaying flush start in secondary"); 386 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 387 assertNull(result.result); 388 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 389 390 // assert that the store memstore is smaller now 391 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 392 LOG.info("Memstore size reduced by:" 393 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 394 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 395 396 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 397 LOG.info("-- Replaying flush commit in secondary"); 398 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 399 400 // assert that the flush files are picked 401 expectedStoreFileCount++; 402 for (HStore s : secondaryRegion.getStores()) { 403 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 404 } 405 MemStoreSize newMss = store.getFlushableSize(); 406 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 407 408 // assert that the region memstore is smaller now 409 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 410 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 411 412 // assert that the store sizes are bigger 413 assertTrue(store.getSize() > storeSize); 414 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 415 assertEquals(store.getSize(), store.getStorefilesSize()); 416 } 417 // after replay verify that everything is still visible 418 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 419 } else if (compactionDesc != null) { 420 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 421 422 // assert that the compaction is applied 423 for (HStore store : secondaryRegion.getStores()) { 424 if (store.getColumnFamilyName().equals("cf1")) { 425 assertEquals(1, store.getStorefilesCount()); 426 } else { 427 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 428 } 429 } 430 } else { 431 lastReplayed = replayEdit(secondaryRegion, entry); 432 } 433 } 434 435 assertEquals(400 - 1, lastReplayed); 436 LOG.info("-- Verifying edits from secondary"); 437 verifyData(secondaryRegion, 0, 400, cq, families); 438 439 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 440 verifyData(primaryRegion, 0, lastReplayed, cq, families); 441 for (HStore store : primaryRegion.getStores()) { 442 if (store.getColumnFamilyName().equals("cf1")) { 443 assertEquals(1, store.getStorefilesCount()); 444 } else { 445 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 446 } 447 } 448 } 449 450 /** 451 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 452 * equal to, greater or less than the previous flush start marker. 453 */ 454 @Test 455 public void testReplayFlushStartMarkers() throws IOException { 456 // load some data to primary and flush. 1 flush and some more unflushed data 457 putDataWithFlushes(primaryRegion, 100, 100, 100); 458 int numRows = 200; 459 460 // now replay the edits and the flush marker 461 reader = createWALReaderForPrimary(); 462 463 LOG.info("-- Replaying edits and flush events in secondary"); 464 465 FlushDescriptor startFlushDesc = null; 466 467 int lastReplayed = 0; 468 while (true) { 469 WAL.Entry entry = reader.next(); 470 if (entry == null) { 471 break; 472 } 473 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 474 if (flushDesc != null) { 475 // first verify that everything is replayed and visible before flush event replay 476 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 477 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 478 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 479 MemStoreSize mss = store.getFlushableSize(); 480 481 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 482 startFlushDesc = flushDesc; 483 LOG.info("-- Replaying flush start in secondary"); 484 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 485 assertNull(result.result); 486 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 487 assertTrue(regionMemstoreSize > 0); 488 assertTrue(mss.getHeapSize() > 0); 489 490 // assert that the store memstore is smaller now 491 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 492 LOG.info("Memstore size reduced by:" 493 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 494 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 495 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 496 497 } 498 // after replay verify that everything is still visible 499 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 500 } else { 501 lastReplayed = replayEdit(secondaryRegion, entry); 502 } 503 } 504 505 // at this point, there should be some data (rows 0-100) in memstore snapshot 506 // and some more data in memstores (rows 100-200) 507 508 verifyData(secondaryRegion, 0, numRows, cq, families); 509 510 // Test case 1: replay the same flush start marker again 511 LOG.info("-- Replaying same flush start in secondary again"); 512 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 513 assertNull(result); // this should return null. Ignoring the flush start marker 514 // assert that we still have prepared flush with the previous setup. 515 assertNotNull(secondaryRegion.getPrepareFlushResult()); 516 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 517 startFlushDesc.getFlushSequenceNumber()); 518 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 519 verifyData(secondaryRegion, 0, numRows, cq, families); 520 521 // Test case 2: replay a flush start marker with a smaller seqId 522 FlushDescriptor startFlushDescSmallerSeqId = 523 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 524 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 525 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 526 assertNull(result); // this should return null. Ignoring the flush start marker 527 // assert that we still have prepared flush with the previous setup. 528 assertNotNull(secondaryRegion.getPrepareFlushResult()); 529 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 530 startFlushDesc.getFlushSequenceNumber()); 531 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 532 verifyData(secondaryRegion, 0, numRows, cq, families); 533 534 // Test case 3: replay a flush start marker with a larger seqId 535 FlushDescriptor startFlushDescLargerSeqId = 536 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 537 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 538 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 539 assertNull(result); // this should return null. Ignoring the flush start marker 540 // assert that we still have prepared flush with the previous setup. 541 assertNotNull(secondaryRegion.getPrepareFlushResult()); 542 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 543 startFlushDesc.getFlushSequenceNumber()); 544 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 545 verifyData(secondaryRegion, 0, numRows, cq, families); 546 547 LOG.info("-- Verifying edits from secondary"); 548 verifyData(secondaryRegion, 0, numRows, cq, families); 549 550 LOG.info("-- Verifying edits from primary."); 551 verifyData(primaryRegion, 0, numRows, cq, families); 552 } 553 554 /** 555 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 556 * less than the previous flush start marker. 557 */ 558 @Test 559 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 560 // load some data to primary and flush. 2 flushes and some more unflushed data 561 putDataWithFlushes(primaryRegion, 100, 200, 100); 562 int numRows = 300; 563 564 // now replay the edits and the flush marker 565 reader = createWALReaderForPrimary(); 566 567 LOG.info("-- Replaying edits and flush events in secondary"); 568 FlushDescriptor startFlushDesc = null; 569 FlushDescriptor commitFlushDesc = null; 570 571 int lastReplayed = 0; 572 while (true) { 573 System.out.println(lastReplayed); 574 WAL.Entry entry = reader.next(); 575 if (entry == null) { 576 break; 577 } 578 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 579 if (flushDesc != null) { 580 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 581 // don't replay the first flush start marker, hold on to it, replay the second one 582 if (startFlushDesc == null) { 583 startFlushDesc = flushDesc; 584 } else { 585 LOG.info("-- Replaying flush start in secondary"); 586 startFlushDesc = flushDesc; 587 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 588 assertNull(result.result); 589 } 590 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 591 // do not replay any flush commit yet 592 if (commitFlushDesc == null) { 593 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 594 } 595 } 596 // after replay verify that everything is still visible 597 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 598 } else { 599 lastReplayed = replayEdit(secondaryRegion, entry); 600 } 601 } 602 603 // at this point, there should be some data (rows 0-200) in memstore snapshot 604 // and some more data in memstores (rows 200-300) 605 verifyData(secondaryRegion, 0, numRows, cq, families); 606 607 // no store files in the region 608 int expectedStoreFileCount = 0; 609 for (HStore s : secondaryRegion.getStores()) { 610 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 611 } 612 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 613 614 // Test case 1: replay the a flush commit marker smaller than what we have prepared 615 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 616 + startFlushDesc); 617 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 618 619 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 620 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 621 622 // assert that the flush files are picked 623 expectedStoreFileCount++; 624 for (HStore s : secondaryRegion.getStores()) { 625 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 626 } 627 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 628 MemStoreSize mss = store.getFlushableSize(); 629 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 630 631 // assert that the region memstore is same as before 632 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 633 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 634 635 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 636 637 LOG.info("-- Verifying edits from secondary"); 638 verifyData(secondaryRegion, 0, numRows, cq, families); 639 640 LOG.info("-- Verifying edits from primary."); 641 verifyData(primaryRegion, 0, numRows, cq, families); 642 } 643 644 /** 645 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 646 * larger than the previous flush start marker. 647 */ 648 @Test 649 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 650 // load some data to primary and flush. 1 flush and some more unflushed data 651 putDataWithFlushes(primaryRegion, 100, 100, 100); 652 int numRows = 200; 653 654 // now replay the edits and the flush marker 655 reader = createWALReaderForPrimary(); 656 657 LOG.info("-- Replaying edits and flush events in secondary"); 658 FlushDescriptor startFlushDesc = null; 659 FlushDescriptor commitFlushDesc = null; 660 661 int lastReplayed = 0; 662 while (true) { 663 WAL.Entry entry = reader.next(); 664 if (entry == null) { 665 break; 666 } 667 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 668 if (flushDesc != null) { 669 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 670 if (startFlushDesc == null) { 671 LOG.info("-- Replaying flush start in secondary"); 672 startFlushDesc = flushDesc; 673 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 674 assertNull(result.result); 675 } 676 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 677 // do not replay any flush commit yet 678 // hold on to the flush commit marker but simulate a larger 679 // flush commit seqId 680 commitFlushDesc = FlushDescriptor.newBuilder(flushDesc) 681 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build(); 682 } 683 // after replay verify that everything is still visible 684 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 685 } else { 686 lastReplayed = replayEdit(secondaryRegion, entry); 687 } 688 } 689 690 // at this point, there should be some data (rows 0-100) in memstore snapshot 691 // and some more data in memstores (rows 100-200) 692 verifyData(secondaryRegion, 0, numRows, cq, families); 693 694 // no store files in the region 695 int expectedStoreFileCount = 0; 696 for (HStore s : secondaryRegion.getStores()) { 697 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 698 } 699 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 700 701 // Test case 1: replay the a flush commit marker larger than what we have prepared 702 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 703 + startFlushDesc); 704 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 705 706 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 707 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 708 709 // assert that the flush files are picked 710 expectedStoreFileCount++; 711 for (HStore s : secondaryRegion.getStores()) { 712 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 713 } 714 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 715 MemStoreSize mss = store.getFlushableSize(); 716 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 717 718 // assert that the region memstore is smaller than before, but not empty 719 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 720 assertTrue(newRegionMemstoreSize > 0); 721 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 722 723 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 724 725 LOG.info("-- Verifying edits from secondary"); 726 verifyData(secondaryRegion, 0, numRows, cq, families); 727 728 LOG.info("-- Verifying edits from primary."); 729 verifyData(primaryRegion, 0, numRows, cq, families); 730 } 731 732 /** 733 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 734 * memstore edits should be dropped after the flush commit replay since they should be in flushed 735 * files 736 */ 737 @Test 738 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 739 throws IOException { 740 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 741 } 742 743 /** 744 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 745 * memstore edits should be not dropped after the flush commit replay since not every edit will be 746 * in flushed files (based on seqId) 747 */ 748 @Test 749 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 750 throws IOException { 751 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 752 } 753 754 /** 755 * Tests the case where we receive a flush commit before receiving any flush prepare markers 756 */ 757 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 758 throws IOException { 759 // load some data to primary and flush. 1 flushes and some more unflushed data. 760 // write more data after flush depending on whether droppableSnapshot 761 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 762 int numRows = droppableMemstore ? 100 : 200; 763 764 // now replay the edits and the flush marker 765 reader = createWALReaderForPrimary(); 766 767 LOG.info("-- Replaying edits and flush events in secondary"); 768 FlushDescriptor commitFlushDesc = null; 769 770 int lastReplayed = 0; 771 while (true) { 772 WAL.Entry entry = reader.next(); 773 if (entry == null) { 774 break; 775 } 776 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 777 if (flushDesc != null) { 778 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 779 // do not replay flush start marker 780 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 781 commitFlushDesc = flushDesc; // hold on to the flush commit marker 782 } 783 // after replay verify that everything is still visible 784 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 785 } else { 786 lastReplayed = replayEdit(secondaryRegion, entry); 787 } 788 } 789 790 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 791 // and some more data in memstores (rows 100-300) 792 verifyData(secondaryRegion, 0, numRows, cq, families); 793 794 // no store files in the region 795 int expectedStoreFileCount = 0; 796 for (HStore s : secondaryRegion.getStores()) { 797 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 798 } 799 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 800 801 // Test case 1: replay a flush commit marker without start flush marker 802 assertNull(secondaryRegion.getPrepareFlushResult()); 803 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 804 805 // ensure all files are visible in secondary 806 for (HStore store : secondaryRegion.getStores()) { 807 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 808 } 809 810 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 811 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 812 813 // assert that the flush files are picked 814 expectedStoreFileCount++; 815 for (HStore s : secondaryRegion.getStores()) { 816 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 817 } 818 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 819 MemStoreSize mss = store.getFlushableSize(); 820 if (droppableMemstore) { 821 // assert that the memstore is dropped 822 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 823 } else { 824 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 825 } 826 827 // assert that the region memstore is same as before (we could not drop) 828 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 829 if (droppableMemstore) { 830 assertTrue(0 == newRegionMemstoreSize); 831 } else { 832 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 833 } 834 835 LOG.info("-- Verifying edits from secondary"); 836 verifyData(secondaryRegion, 0, numRows, cq, families); 837 838 LOG.info("-- Verifying edits from primary."); 839 verifyData(primaryRegion, 0, numRows, cq, families); 840 } 841 842 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 843 return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build(); 844 } 845 846 /** 847 * Tests replaying region open markers from primary region. Checks whether the files are picked up 848 */ 849 @Test 850 public void testReplayRegionOpenEvent() throws IOException { 851 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 852 int numRows = 100; 853 854 // close the region and open again. 855 primaryRegion.close(); 856 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 857 858 // now replay the edits and the flush marker 859 reader = createWALReaderForPrimary(); 860 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 861 862 LOG.info("-- Replaying edits and region events in secondary"); 863 while (true) { 864 WAL.Entry entry = reader.next(); 865 if (entry == null) { 866 break; 867 } 868 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 869 RegionEventDescriptor regionEventDesc = 870 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 871 872 if (flushDesc != null) { 873 // don't replay flush events 874 } else if (regionEventDesc != null) { 875 regionEvents.add(regionEventDesc); 876 } else { 877 // don't replay edits 878 } 879 } 880 881 // we should have 1 open, 1 close and 1 open event 882 assertEquals(3, regionEvents.size()); 883 884 // replay the first region open event. 885 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 886 887 // replay the close event as well 888 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 889 890 // no store files in the region 891 int expectedStoreFileCount = 0; 892 for (HStore s : secondaryRegion.getStores()) { 893 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 894 } 895 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 896 assertTrue(regionMemstoreSize == 0); 897 898 // now replay the region open event that should contain new file locations 899 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 900 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 901 902 // assert that the flush files are picked 903 expectedStoreFileCount++; 904 for (HStore s : secondaryRegion.getStores()) { 905 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 906 } 907 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 908 MemStoreSize mss = store.getFlushableSize(); 909 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 910 911 // assert that the region memstore is empty 912 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 913 assertTrue(newRegionMemstoreSize == 0); 914 915 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 916 // any 917 918 LOG.info("-- Verifying edits from secondary"); 919 verifyData(secondaryRegion, 0, numRows, cq, families); 920 921 LOG.info("-- Verifying edits from primary."); 922 verifyData(primaryRegion, 0, numRows, cq, families); 923 } 924 925 /** 926 * Tests the case where we replay a region open event after a flush start but before receiving 927 * flush commit 928 */ 929 @Test 930 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 931 putDataWithFlushes(primaryRegion, 100, 100, 100); 932 int numRows = 200; 933 934 // close the region and open again. 935 primaryRegion.close(); 936 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 937 938 // now replay the edits and the flush marker 939 reader = createWALReaderForPrimary(); 940 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 941 942 LOG.info("-- Replaying edits and region events in secondary"); 943 while (true) { 944 WAL.Entry entry = reader.next(); 945 if (entry == null) { 946 break; 947 } 948 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 949 RegionEventDescriptor regionEventDesc = 950 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 951 952 if (flushDesc != null) { 953 // only replay flush start 954 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 955 secondaryRegion.replayWALFlushStartMarker(flushDesc); 956 } 957 } else if (regionEventDesc != null) { 958 regionEvents.add(regionEventDesc); 959 } else { 960 replayEdit(secondaryRegion, entry); 961 } 962 } 963 964 // at this point, there should be some data (rows 0-100) in the memstore snapshot 965 // and some more data in memstores (rows 100-200) 966 verifyData(secondaryRegion, 0, numRows, cq, families); 967 968 // we should have 1 open, 1 close and 1 open event 969 assertEquals(3, regionEvents.size()); 970 971 // no store files in the region 972 int expectedStoreFileCount = 0; 973 for (HStore s : secondaryRegion.getStores()) { 974 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 975 } 976 977 // now replay the region open event that should contain new file locations 978 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 979 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 980 981 // assert that the flush files are picked 982 expectedStoreFileCount = 2; // two flushes happened 983 for (HStore s : secondaryRegion.getStores()) { 984 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 985 } 986 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 987 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 988 assertTrue(newSnapshotSize.getDataSize() == 0); 989 990 // assert that the region memstore is empty 991 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 992 assertTrue(newRegionMemstoreSize == 0); 993 994 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 995 // any 996 997 LOG.info("-- Verifying edits from secondary"); 998 verifyData(secondaryRegion, 0, numRows, cq, families); 999 1000 LOG.info("-- Verifying edits from primary."); 1001 verifyData(primaryRegion, 0, numRows, cq, families); 1002 } 1003 1004 /** 1005 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1006 * of the last replayed region open event. 1007 */ 1008 @Test 1009 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1010 putDataWithFlushes(primaryRegion, 100, 100, 0); 1011 int numRows = 100; 1012 1013 // close the region and open again. 1014 primaryRegion.close(); 1015 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1016 1017 // now replay the edits and the flush marker 1018 reader = createWALReaderForPrimary(); 1019 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1020 List<WAL.Entry> edits = Lists.newArrayList(); 1021 1022 LOG.info("-- Replaying edits and region events in secondary"); 1023 while (true) { 1024 WAL.Entry entry = reader.next(); 1025 if (entry == null) { 1026 break; 1027 } 1028 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1029 RegionEventDescriptor regionEventDesc = 1030 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1031 1032 if (flushDesc != null) { 1033 // don't replay flushes 1034 } else if (regionEventDesc != null) { 1035 regionEvents.add(regionEventDesc); 1036 } else { 1037 edits.add(entry); 1038 } 1039 } 1040 1041 // replay the region open of first open, but with the seqid of the second open 1042 // this way non of the flush files will be picked up. 1043 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0)) 1044 .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build()); 1045 1046 // replay edits from the before region close. If replay does not 1047 // skip these the following verification will NOT fail. 1048 for (WAL.Entry entry : edits) { 1049 replayEdit(secondaryRegion, entry); 1050 } 1051 1052 boolean expectedFail = false; 1053 try { 1054 verifyData(secondaryRegion, 0, numRows, cq, families); 1055 } catch (AssertionError e) { 1056 expectedFail = true; // expected 1057 } 1058 if (!expectedFail) { 1059 fail("Should have failed this verification"); 1060 } 1061 } 1062 1063 @Test 1064 public void testReplayFlushSeqIds() throws IOException { 1065 // load some data to primary and flush 1066 int start = 0; 1067 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 1068 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1069 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1070 primaryRegion.flush(true); 1071 1072 // now replay the flush marker 1073 reader = createWALReaderForPrimary(); 1074 1075 long flushSeqId = -1; 1076 LOG.info("-- Replaying flush events in secondary"); 1077 while (true) { 1078 WAL.Entry entry = reader.next(); 1079 if (entry == null) { 1080 break; 1081 } 1082 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1083 if (flushDesc != null) { 1084 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1085 LOG.info("-- Replaying flush start in secondary"); 1086 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1087 flushSeqId = flushDesc.getFlushSequenceNumber(); 1088 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1089 LOG.info("-- Replaying flush commit in secondary"); 1090 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1091 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1092 } 1093 } 1094 // else do not replay 1095 } 1096 1097 // TODO: what to do with this? 1098 // assert that the newly picked up flush file is visible 1099 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1100 assertEquals(flushSeqId, readPoint); 1101 1102 // after replay verify that everything is still visible 1103 verifyData(secondaryRegion, 0, 100, cq, families); 1104 } 1105 1106 @Test 1107 public void testSeqIdsFromReplay() throws IOException { 1108 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1109 // original seqIds and they are made visible through mvcc read point upon replay 1110 String method = name.getMethodName(); 1111 byte[] tableName = Bytes.toBytes(method); 1112 byte[] family = Bytes.toBytes("family"); 1113 1114 HRegion region = initHRegion(tableName, family); 1115 try { 1116 // replay an entry that is bigger than current read point 1117 long readPoint = region.getMVCC().getReadPoint(); 1118 long origSeqId = readPoint + 100; 1119 1120 Put put = new Put(row).addColumn(family, row, row); 1121 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1122 replay(region, put, origSeqId); 1123 1124 // read point should have advanced to this seqId 1125 assertGet(region, family, row); 1126 1127 // region seqId should have advanced at least to this seqId 1128 assertEquals(origSeqId, region.getReadPoint(null)); 1129 1130 // replay an entry that is smaller than current read point 1131 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1132 // replay does not allow reads while replay is going on. 1133 put = new Put(row2).addColumn(family, row2, row2); 1134 put.setDurability(Durability.SKIP_WAL); 1135 replay(region, put, origSeqId - 50); 1136 1137 assertGet(region, family, row2); 1138 } finally { 1139 region.close(); 1140 } 1141 } 1142 1143 /** 1144 * Tests that a region opened in secondary mode would not write region open / close events to its 1145 * WAL. 1146 */ 1147 @Test 1148 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1149 secondaryRegion.close(); 1150 walSecondary = spy(walSecondary); 1151 1152 // test for region open and close 1153 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1154 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1155 any(WALEdit.class)); 1156 1157 // test for replay prepare flush 1158 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1159 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder() 1160 .setFlushSequenceNumber(10) 1161 .setTableName(UnsafeByteOperations 1162 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1163 .setAction(FlushAction.START_FLUSH) 1164 .setEncodedRegionName( 1165 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1166 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1167 .build()); 1168 1169 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1170 any(WALEdit.class)); 1171 1172 secondaryRegion.close(); 1173 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1174 any(WALEdit.class)); 1175 } 1176 1177 /** 1178 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1179 */ 1180 @Test 1181 public void testRegionReadsEnabledFlag() throws IOException { 1182 1183 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1184 1185 verifyData(secondaryRegion, 0, 100, cq, families); 1186 1187 // now disable reads 1188 secondaryRegion.setReadsEnabled(false); 1189 try { 1190 verifyData(secondaryRegion, 0, 100, cq, families); 1191 fail("Should have failed with IOException"); 1192 } catch (IOException ex) { 1193 // expected 1194 } 1195 1196 // verify that we can still replay data 1197 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1198 1199 // now enable reads again 1200 secondaryRegion.setReadsEnabled(true); 1201 verifyData(secondaryRegion, 0, 200, cq, families); 1202 } 1203 1204 /** 1205 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1206 * It should write the flush request marker instead. 1207 */ 1208 @Test 1209 public void testWriteFlushRequestMarker() throws IOException { 1210 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1211 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1212 assertNotNull(result); 1213 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1214 assertFalse(result.wroteFlushWalMarker); 1215 1216 // request flush again, but this time with writeFlushRequestWalMarker = true 1217 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1218 assertNotNull(result); 1219 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1220 assertTrue(result.wroteFlushWalMarker); 1221 1222 List<FlushDescriptor> flushes = Lists.newArrayList(); 1223 reader = createWALReaderForPrimary(); 1224 while (true) { 1225 WAL.Entry entry = reader.next(); 1226 if (entry == null) { 1227 break; 1228 } 1229 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1230 if (flush != null) { 1231 flushes.add(flush); 1232 } 1233 } 1234 1235 assertEquals(1, flushes.size()); 1236 assertNotNull(flushes.get(0)); 1237 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1238 } 1239 1240 /** 1241 * Test the case where the secondary region replica is not in reads enabled state because it is 1242 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush 1243 * marker entry should restore the reads enabled status in the region and allow the reads to 1244 * continue. 1245 */ 1246 @Test 1247 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1248 disableReads(secondaryRegion); 1249 1250 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1251 // triggered flush restores readsEnabled 1252 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1253 reader = createWALReaderForPrimary(); 1254 while (true) { 1255 WAL.Entry entry = reader.next(); 1256 if (entry == null) { 1257 break; 1258 } 1259 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1260 if (flush != null) { 1261 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1262 } 1263 } 1264 1265 // now reads should be enabled 1266 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1267 } 1268 1269 /** 1270 * Test the case where the secondary region replica is not in reads enabled state because it is 1271 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1272 * entries should restore the reads enabled status in the region and allow the reads to continue. 1273 */ 1274 @Test 1275 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1276 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1277 // from triggered flush restores readsEnabled 1278 disableReads(secondaryRegion); 1279 1280 // put some data in primary 1281 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1282 primaryRegion.flush(true); 1283 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1284 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1285 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1286 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1287 // but can't figure it... and this is only test that seems to suffer this flush issue. 1288 // St.Ack 20160201 1289 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1290 1291 reader = createWALReaderForPrimary(); 1292 while (true) { 1293 WAL.Entry entry = reader.next(); 1294 LOG.info(Objects.toString(entry)); 1295 if (entry == null) { 1296 break; 1297 } 1298 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1299 if (flush != null) { 1300 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1301 } else { 1302 replayEdit(secondaryRegion, entry); 1303 } 1304 } 1305 1306 // now reads should be enabled 1307 verifyData(secondaryRegion, 0, 100, cq, families); 1308 } 1309 1310 /** 1311 * Test the case where the secondary region replica is not in reads enabled state because it is 1312 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1313 * entries should restore the reads enabled status in the region and allow the reads to continue. 1314 */ 1315 @Test 1316 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1317 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1318 // from triggered flush restores readsEnabled 1319 disableReads(secondaryRegion); 1320 1321 // put some data in primary 1322 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1323 primaryRegion.flush(true); 1324 1325 reader = createWALReaderForPrimary(); 1326 while (true) { 1327 WAL.Entry entry = reader.next(); 1328 if (entry == null) { 1329 break; 1330 } 1331 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1332 if (flush != null) { 1333 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1334 } 1335 } 1336 1337 // now reads should be enabled 1338 verifyData(secondaryRegion, 0, 100, cq, families); 1339 } 1340 1341 /** 1342 * Test the case where the secondary region replica is not in reads enabled state because it is 1343 * waiting for a flush or region open marker from primary region. Replaying region open event 1344 * entry from primary should restore the reads enabled status in the region and allow the reads to 1345 * continue. 1346 */ 1347 @Test 1348 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1349 // Test case 3: Test that replaying region open event markers restores readsEnabled 1350 disableReads(secondaryRegion); 1351 1352 primaryRegion.close(); 1353 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1354 1355 reader = createWALReaderForPrimary(); 1356 while (true) { 1357 WAL.Entry entry = reader.next(); 1358 if (entry == null) { 1359 break; 1360 } 1361 1362 RegionEventDescriptor regionEventDesc = 1363 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1364 1365 if (regionEventDesc != null) { 1366 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1367 } 1368 } 1369 1370 // now reads should be enabled 1371 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1372 } 1373 1374 @Test 1375 public void testRefresStoreFiles() throws IOException { 1376 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1377 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1378 1379 // Test case 1: refresh with an empty region 1380 secondaryRegion.refreshStoreFiles(); 1381 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1382 1383 // do one flush 1384 putDataWithFlushes(primaryRegion, 100, 100, 0); 1385 int numRows = 100; 1386 1387 // refresh the store file list, and ensure that the files are picked up. 1388 secondaryRegion.refreshStoreFiles(); 1389 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1390 secondaryRegion.getStoreFileList(families)); 1391 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1392 1393 LOG.info("-- Verifying edits from secondary"); 1394 verifyData(secondaryRegion, 0, numRows, cq, families); 1395 1396 // Test case 2: 3 some more flushes 1397 putDataWithFlushes(primaryRegion, 100, 300, 0); 1398 numRows = 300; 1399 1400 // refresh the store file list, and ensure that the files are picked up. 1401 secondaryRegion.refreshStoreFiles(); 1402 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1403 secondaryRegion.getStoreFileList(families)); 1404 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1405 1406 LOG.info("-- Verifying edits from secondary"); 1407 verifyData(secondaryRegion, 0, numRows, cq, families); 1408 1409 if (FSUtils.WINDOWS) { 1410 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1411 return; 1412 } 1413 1414 // Test case 3: compact primary files 1415 primaryRegion.compactStores(); 1416 List<HRegion> regions = new ArrayList<>(); 1417 regions.add(primaryRegion); 1418 Mockito.doReturn(regions).when(rss).getRegions(); 1419 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1420 cleaner.chore(); 1421 secondaryRegion.refreshStoreFiles(); 1422 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1423 secondaryRegion.getStoreFileList(families)); 1424 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1425 1426 LOG.info("-- Verifying edits from secondary"); 1427 verifyData(secondaryRegion, 0, numRows, cq, families); 1428 1429 LOG.info("-- Replaying edits in secondary"); 1430 1431 // Test case 4: replay some edits, ensure that memstore is dropped. 1432 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1433 putDataWithFlushes(primaryRegion, 400, 400, 0); 1434 numRows = 400; 1435 1436 reader = createWALReaderForPrimary(); 1437 while (true) { 1438 WAL.Entry entry = reader.next(); 1439 if (entry == null) { 1440 break; 1441 } 1442 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1443 if (flush != null) { 1444 // do not replay flush 1445 } else { 1446 replayEdit(secondaryRegion, entry); 1447 } 1448 } 1449 1450 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1451 1452 secondaryRegion.refreshStoreFiles(); 1453 1454 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1455 1456 LOG.info("-- Verifying edits from primary"); 1457 verifyData(primaryRegion, 0, numRows, cq, families); 1458 LOG.info("-- Verifying edits from secondary"); 1459 verifyData(secondaryRegion, 0, numRows, cq, families); 1460 } 1461 1462 /** 1463 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1464 */ 1465 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1466 List<Path> l1 = new ArrayList<>(list1.size()); 1467 for (String path : list1) { 1468 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1469 } 1470 List<Path> l2 = new ArrayList<>(list2.size()); 1471 for (String path : list2) { 1472 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1473 } 1474 assertEquals(l1, l2); 1475 } 1476 1477 private void disableReads(HRegion region) { 1478 region.setReadsEnabled(false); 1479 try { 1480 verifyData(region, 0, 1, cq, families); 1481 fail("Should have failed with IOException"); 1482 } catch (IOException ex) { 1483 // expected 1484 } 1485 } 1486 1487 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1488 put.setDurability(Durability.SKIP_WAL); 1489 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1490 region.batchReplay(new MutationReplay[] { mutation }, replaySeqId); 1491 } 1492 1493 /** 1494 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1495 */ 1496 @Test 1497 public void testReplayBulkLoadEvent() throws IOException { 1498 LOG.info("testReplayBulkLoadEvent starts"); 1499 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1500 1501 // close the region and open again. 1502 primaryRegion.close(); 1503 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1504 1505 // bulk load a file into primary region 1506 byte[] randomValues = new byte[20]; 1507 Bytes.random(randomValues); 1508 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1509 1510 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1511 int expectedLoadFileCount = 0; 1512 for (byte[] family : families) { 1513 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1514 expectedLoadFileCount++; 1515 } 1516 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1517 1518 // now replay the edits and the bulk load marker 1519 reader = createWALReaderForPrimary(); 1520 1521 LOG.info("-- Replaying edits and region events in secondary"); 1522 BulkLoadDescriptor bulkloadEvent = null; 1523 while (true) { 1524 WAL.Entry entry = reader.next(); 1525 if (entry == null) { 1526 break; 1527 } 1528 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1529 if (bulkloadEvent != null) { 1530 break; 1531 } 1532 } 1533 1534 // we should have 1 bulk load event 1535 assertTrue(bulkloadEvent != null); 1536 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1537 1538 // replay the bulk load event 1539 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1540 1541 List<String> storeFileName = new ArrayList<>(); 1542 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1543 storeFileName.addAll(storeDesc.getStoreFileList()); 1544 } 1545 // assert that the bulk loaded files are picked 1546 for (HStore s : secondaryRegion.getStores()) { 1547 for (HStoreFile sf : s.getStorefiles()) { 1548 storeFileName.remove(sf.getPath().getName()); 1549 } 1550 } 1551 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1552 1553 LOG.info("-- Verifying edits from secondary"); 1554 for (byte[] family : families) { 1555 assertGet(secondaryRegion, family, randomValues); 1556 } 1557 } 1558 1559 @Test 1560 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1561 // tests replaying flush commit marker, but the flush file has already been compacted 1562 // from primary and also deleted from the archive directory 1563 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder() 1564 .setFlushSequenceNumber(Long.MAX_VALUE) 1565 .setTableName(UnsafeByteOperations 1566 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1567 .setAction(FlushAction.COMMIT_FLUSH) 1568 .setEncodedRegionName( 1569 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1570 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1571 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1572 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1573 .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build()) 1574 .build()); 1575 } 1576 1577 @Test 1578 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1579 // tests replaying compaction marker, but the compaction output file has already been compacted 1580 // from primary and also deleted from the archive directory 1581 secondaryRegion 1582 .replayWALCompactionMarker( 1583 CompactionDescriptor.newBuilder() 1584 .setTableName(UnsafeByteOperations 1585 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1586 .setEncodedRegionName( 1587 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1588 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123") 1589 .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir") 1590 .setRegionName( 1591 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1592 .build(), 1593 true, true, Long.MAX_VALUE); 1594 } 1595 1596 @Test 1597 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1598 // tests replaying region open event marker, but the region files have already been compacted 1599 // from primary and also deleted from the archive directory 1600 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1601 .setTableName(UnsafeByteOperations 1602 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1603 .setEncodedRegionName( 1604 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1605 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1606 .setEventType(EventType.REGION_OPEN) 1607 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1608 .setLogSequenceNumber(Long.MAX_VALUE) 1609 .addStores( 1610 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1611 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1612 .build()); 1613 } 1614 1615 @Test 1616 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1617 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1618 // from primary and also deleted from the archive directory 1619 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1620 .setTableName( 1621 ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1622 .setEncodedRegionName( 1623 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1624 .setBulkloadSeqNum(Long.MAX_VALUE) 1625 .addStores( 1626 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1627 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1628 .build()); 1629 } 1630 1631 private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes) 1632 throws IOException { 1633 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1634 // TODO We need a way to do this without creating files 1635 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1636 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1637 try { 1638 hFileFactory.withOutputStream(out); 1639 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1640 HFile.Writer writer = hFileFactory.create(); 1641 try { 1642 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 1643 .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L) 1644 .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build())); 1645 } finally { 1646 writer.close(); 1647 } 1648 } finally { 1649 out.close(); 1650 } 1651 return testFile.toString(); 1652 } 1653 1654 /** 1655 * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush 1656 * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but 1657 * does not execute flush after 1658 */ 1659 private void putDataWithFlushes(HRegion region, int flushInterval, int numRows, 1660 int numRowsAfterFlush) throws IOException { 1661 int start = 0; 1662 for (; start < numRows; start += flushInterval) { 1663 LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval)); 1664 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1665 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1666 region.flush(true); 1667 } 1668 LOG.info("-- Writing some more data to primary, not flushing"); 1669 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1670 } 1671 1672 private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf, 1673 byte[]... families) throws IOException { 1674 for (int i = startRow; i < startRow + numRows; i++) { 1675 Put put = new Put(Bytes.toBytes("" + i)); 1676 put.setDurability(Durability.SKIP_WAL); 1677 for (byte[] family : families) { 1678 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1679 } 1680 replay(region, put, i + 1); 1681 } 1682 } 1683 1684 private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException { 1685 return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, 1686 HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families); 1687 } 1688}