001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.Durability; 055import org.apache.hadoop.hbase.client.Get; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.RegionInfoBuilder; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 061import org.apache.hadoop.hbase.executor.ExecutorService; 062import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 063import org.apache.hadoop.hbase.executor.ExecutorType; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 068import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 073import org.apache.hadoop.hbase.util.FSUtils; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 076import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 077import org.apache.hadoop.hbase.wal.WAL; 078import org.apache.hadoop.hbase.wal.WALEdit; 079import org.apache.hadoop.hbase.wal.WALFactory; 080import org.apache.hadoop.hbase.wal.WALKeyImpl; 081import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 082import org.apache.hadoop.hbase.wal.WALStreamReader; 083import org.apache.hadoop.util.StringUtils; 084import org.junit.After; 085import org.junit.AfterClass; 086import org.junit.Before; 087import org.junit.BeforeClass; 088import org.junit.ClassRule; 089import org.junit.Rule; 090import org.junit.Test; 091import org.junit.experimental.categories.Category; 092import org.junit.rules.TestName; 093import org.mockito.Mockito; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 098import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 110 111/** 112 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 113 * region replicas 114 */ 115@Category(LargeTests.class) 116public class TestHRegionReplayEvents { 117 118 @ClassRule 119 public static final HBaseClassTestRule CLASS_RULE = 120 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 121 122 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 123 @Rule 124 public TestName name = new TestName(); 125 126 private static HBaseTestingUtility TEST_UTIL; 127 128 public static Configuration CONF; 129 private String dir; 130 131 private byte[][] families = 132 new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") }; 133 134 // Test names 135 protected byte[] tableName; 136 protected String method; 137 protected final byte[] row = Bytes.toBytes("rowA"); 138 protected final byte[] row2 = Bytes.toBytes("rowB"); 139 protected byte[] cq = Bytes.toBytes("cq"); 140 141 // per test fields 142 private Path rootDir; 143 private TableDescriptor htd; 144 private RegionServerServices rss; 145 private RegionInfo primaryHri, secondaryHri; 146 private HRegion primaryRegion, secondaryRegion; 147 private WAL walPrimary, walSecondary; 148 private WALStreamReader reader; 149 150 @BeforeClass 151 public static void setUpBeforeClass() throws Exception { 152 TEST_UTIL = new HBaseTestingUtility(); 153 TEST_UTIL.startMiniDFSCluster(1); 154 } 155 156 @AfterClass 157 public static void tearDownAfterClass() throws Exception { 158 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 159 TEST_UTIL.cleanupTestDir(); 160 TEST_UTIL.shutdownMiniDFSCluster(); 161 } 162 163 @Before 164 public void setUp() throws Exception { 165 CONF = TEST_UTIL.getConfiguration(); 166 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 167 method = name.getMethodName(); 168 tableName = Bytes.toBytes(name.getMethodName()); 169 rootDir = new Path(dir + method); 170 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 171 method = name.getMethodName(); 172 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 173 for (byte[] family : families) { 174 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 175 } 176 htd = builder.build(); 177 178 long time = EnvironmentEdgeManager.currentTime(); 179 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 180 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 181 primaryHri = 182 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 183 secondaryHri = 184 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 185 186 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 187 walPrimary = wals.getWAL(primaryHri); 188 walSecondary = wals.getWAL(secondaryHri); 189 190 rss = mock(RegionServerServices.class); 191 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 192 when(rss.getConfiguration()).thenReturn(CONF); 193 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 194 String string = 195 org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); 196 ExecutorService es = new ExecutorService(string); 197 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1) 198 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 199 when(rss.getExecutorService()).thenReturn(es); 200 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 201 primaryRegion.close(); 202 List<HRegion> regions = new ArrayList<>(); 203 regions.add(primaryRegion); 204 Mockito.doReturn(regions).when(rss).getRegions(); 205 206 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 207 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 208 209 reader = null; 210 } 211 212 @After 213 public void tearDown() throws Exception { 214 if (reader != null) { 215 reader.close(); 216 } 217 218 if (primaryRegion != null) { 219 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 220 } 221 if (secondaryRegion != null) { 222 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 223 } 224 225 EnvironmentEdgeManagerTestHelper.reset(); 226 } 227 228 String getName() { 229 return name.getMethodName(); 230 } 231 232 // Some of the test cases are as follows: 233 // 1. replay flush start marker again 234 // 2. replay flush with smaller seqId than what is there in memstore snapshot 235 // 3. replay flush with larger seqId than what is there in memstore snapshot 236 // 4. replay flush commit without flush prepare. non droppable memstore 237 // 5. replay flush commit without flush prepare. droppable memstore 238 // 6. replay open region event 239 // 7. replay open region event after flush start 240 // 8. replay flush form an earlier seqId (test ignoring seqIds) 241 // 9. start flush does not prevent region from closing. 242 243 @Test 244 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 245 // load some data and flush ensure that the secondary replica will not execute the flush 246 247 // load some data to secondary by replaying 248 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 249 250 verifyData(secondaryRegion, 0, 1000, cq, families); 251 252 // flush region 253 FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true); 254 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 255 256 verifyData(secondaryRegion, 0, 1000, cq, families); 257 258 // close the region, and inspect that it has not flushed 259 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 260 // assert that there are no files (due to flush) 261 for (List<HStoreFile> f : files.values()) { 262 assertTrue(f.isEmpty()); 263 } 264 } 265 266 /** 267 * Tests a case where we replay only a flush start marker, then the region is closed. This region 268 * should not block indefinitely 269 */ 270 @Test 271 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 272 // load some data to primary and flush 273 int start = 0; 274 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 275 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 276 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 277 primaryRegion.flush(true); 278 279 // now replay the edits and the flush marker 280 reader = createWALReaderForPrimary(); 281 282 LOG.info("-- Replaying edits and flush events in secondary"); 283 while (true) { 284 WAL.Entry entry = reader.next(); 285 if (entry == null) { 286 break; 287 } 288 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 289 if (flushDesc != null) { 290 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 291 LOG.info("-- Replaying flush start in secondary"); 292 secondaryRegion.replayWALFlushStartMarker(flushDesc); 293 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 294 LOG.info("-- NOT Replaying flush commit in secondary"); 295 } 296 } else { 297 replayEdit(secondaryRegion, entry); 298 } 299 } 300 301 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 302 // now close the region which should not cause hold because of un-committed flush 303 secondaryRegion.close(); 304 305 // verify that the memstore size is back to what it was 306 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 307 } 308 309 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 310 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 311 return 0; // handled elsewhere 312 } 313 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 314 for (Cell cell : entry.getEdit().getCells()) 315 put.add(cell); 316 put.setDurability(Durability.SKIP_WAL); 317 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 318 region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId()); 319 return Integer.parseInt(Bytes.toString(put.getRow())); 320 } 321 322 private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException { 323 return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(), 324 AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); 325 } 326 327 @Test 328 public void testBatchReplayWithMultipleNonces() throws IOException { 329 try { 330 MutationReplay[] mutations = new MutationReplay[100]; 331 for (int i = 0; i < 100; i++) { 332 Put put = new Put(Bytes.toBytes(i)); 333 put.setDurability(Durability.SYNC_WAL); 334 for (byte[] familly : this.families) { 335 put.addColumn(familly, this.cq, null); 336 long nonceNum = i / 10; 337 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 338 } 339 } 340 primaryRegion.batchReplay(mutations, 20); 341 } catch (Exception e) { 342 String msg = "Error while replay of batch with multiple nonces. "; 343 LOG.error(msg, e); 344 fail(msg + e.getMessage()); 345 } 346 } 347 348 @Test 349 public void testReplayFlushesAndCompactions() throws IOException { 350 // initiate a secondary region with some data. 351 352 // load some data to primary and flush. 3 flushes and some more unflushed data 353 putDataWithFlushes(primaryRegion, 100, 300, 100); 354 355 // compaction from primary 356 LOG.info("-- Compacting primary, only 1 store"); 357 primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE); 358 359 // now replay the edits and the flush marker 360 reader = createWALReaderForPrimary(); 361 362 LOG.info("-- Replaying edits and flush events in secondary"); 363 int lastReplayed = 0; 364 int expectedStoreFileCount = 0; 365 while (true) { 366 WAL.Entry entry = reader.next(); 367 if (entry == null) { 368 break; 369 } 370 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 371 CompactionDescriptor compactionDesc = 372 WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 373 if (flushDesc != null) { 374 // first verify that everything is replayed and visible before flush event replay 375 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 376 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 377 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 378 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 379 MemStoreSize mss = store.getFlushableSize(); 380 long storeSize = store.getSize(); 381 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 382 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 383 LOG.info("-- Replaying flush start in secondary"); 384 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 385 assertNull(result.result); 386 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 387 388 // assert that the store memstore is smaller now 389 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 390 LOG.info("Memstore size reduced by:" 391 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 392 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 393 394 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 395 LOG.info("-- Replaying flush commit in secondary"); 396 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 397 398 // assert that the flush files are picked 399 expectedStoreFileCount++; 400 for (HStore s : secondaryRegion.getStores()) { 401 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 402 } 403 MemStoreSize newMss = store.getFlushableSize(); 404 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 405 406 // assert that the region memstore is smaller now 407 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 408 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 409 410 // assert that the store sizes are bigger 411 assertTrue(store.getSize() > storeSize); 412 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 413 assertEquals(store.getSize(), store.getStorefilesSize()); 414 } 415 // after replay verify that everything is still visible 416 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 417 } else if (compactionDesc != null) { 418 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 419 420 // assert that the compaction is applied 421 for (HStore store : secondaryRegion.getStores()) { 422 if (store.getColumnFamilyName().equals("cf1")) { 423 assertEquals(1, store.getStorefilesCount()); 424 } else { 425 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 426 } 427 } 428 } else { 429 lastReplayed = replayEdit(secondaryRegion, entry); 430 ; 431 } 432 } 433 434 assertEquals(400 - 1, lastReplayed); 435 LOG.info("-- Verifying edits from secondary"); 436 verifyData(secondaryRegion, 0, 400, cq, families); 437 438 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 439 verifyData(primaryRegion, 0, lastReplayed, cq, families); 440 for (HStore store : primaryRegion.getStores()) { 441 if (store.getColumnFamilyName().equals("cf1")) { 442 assertEquals(1, store.getStorefilesCount()); 443 } else { 444 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 445 } 446 } 447 } 448 449 /** 450 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 451 * equal to, greater or less than the previous flush start marker. 452 */ 453 @Test 454 public void testReplayFlushStartMarkers() throws IOException { 455 // load some data to primary and flush. 1 flush and some more unflushed data 456 putDataWithFlushes(primaryRegion, 100, 100, 100); 457 int numRows = 200; 458 459 // now replay the edits and the flush marker 460 reader = createWALReaderForPrimary(); 461 462 LOG.info("-- Replaying edits and flush events in secondary"); 463 464 FlushDescriptor startFlushDesc = null; 465 466 int lastReplayed = 0; 467 while (true) { 468 WAL.Entry entry = reader.next(); 469 if (entry == null) { 470 break; 471 } 472 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 473 if (flushDesc != null) { 474 // first verify that everything is replayed and visible before flush event replay 475 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 476 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 477 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 478 MemStoreSize mss = store.getFlushableSize(); 479 480 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 481 startFlushDesc = flushDesc; 482 LOG.info("-- Replaying flush start in secondary"); 483 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 484 assertNull(result.result); 485 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 486 assertTrue(regionMemstoreSize > 0); 487 assertTrue(mss.getHeapSize() > 0); 488 489 // assert that the store memstore is smaller now 490 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 491 LOG.info("Memstore size reduced by:" 492 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 493 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 494 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 495 496 } 497 // after replay verify that everything is still visible 498 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 499 } else { 500 lastReplayed = replayEdit(secondaryRegion, entry); 501 } 502 } 503 504 // at this point, there should be some data (rows 0-100) in memstore snapshot 505 // and some more data in memstores (rows 100-200) 506 507 verifyData(secondaryRegion, 0, numRows, cq, families); 508 509 // Test case 1: replay the same flush start marker again 510 LOG.info("-- Replaying same flush start in secondary again"); 511 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 512 assertNull(result); // this should return null. Ignoring the flush start marker 513 // assert that we still have prepared flush with the previous setup. 514 assertNotNull(secondaryRegion.getPrepareFlushResult()); 515 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 516 startFlushDesc.getFlushSequenceNumber()); 517 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 518 verifyData(secondaryRegion, 0, numRows, cq, families); 519 520 // Test case 2: replay a flush start marker with a smaller seqId 521 FlushDescriptor startFlushDescSmallerSeqId = 522 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 523 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 524 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 525 assertNull(result); // this should return null. Ignoring the flush start marker 526 // assert that we still have prepared flush with the previous setup. 527 assertNotNull(secondaryRegion.getPrepareFlushResult()); 528 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 529 startFlushDesc.getFlushSequenceNumber()); 530 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 531 verifyData(secondaryRegion, 0, numRows, cq, families); 532 533 // Test case 3: replay a flush start marker with a larger seqId 534 FlushDescriptor startFlushDescLargerSeqId = 535 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 536 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 537 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 538 assertNull(result); // this should return null. Ignoring the flush start marker 539 // assert that we still have prepared flush with the previous setup. 540 assertNotNull(secondaryRegion.getPrepareFlushResult()); 541 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 542 startFlushDesc.getFlushSequenceNumber()); 543 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 544 verifyData(secondaryRegion, 0, numRows, cq, families); 545 546 LOG.info("-- Verifying edits from secondary"); 547 verifyData(secondaryRegion, 0, numRows, cq, families); 548 549 LOG.info("-- Verifying edits from primary."); 550 verifyData(primaryRegion, 0, numRows, cq, families); 551 } 552 553 /** 554 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 555 * less than the previous flush start marker. 556 */ 557 @Test 558 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 559 // load some data to primary and flush. 2 flushes and some more unflushed data 560 putDataWithFlushes(primaryRegion, 100, 200, 100); 561 int numRows = 300; 562 563 // now replay the edits and the flush marker 564 reader = createWALReaderForPrimary(); 565 566 LOG.info("-- Replaying edits and flush events in secondary"); 567 FlushDescriptor startFlushDesc = null; 568 FlushDescriptor commitFlushDesc = null; 569 570 int lastReplayed = 0; 571 while (true) { 572 System.out.println(lastReplayed); 573 WAL.Entry entry = reader.next(); 574 if (entry == null) { 575 break; 576 } 577 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 578 if (flushDesc != null) { 579 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 580 // don't replay the first flush start marker, hold on to it, replay the second one 581 if (startFlushDesc == null) { 582 startFlushDesc = flushDesc; 583 } else { 584 LOG.info("-- Replaying flush start in secondary"); 585 startFlushDesc = flushDesc; 586 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 587 assertNull(result.result); 588 } 589 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 590 // do not replay any flush commit yet 591 if (commitFlushDesc == null) { 592 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 593 } 594 } 595 // after replay verify that everything is still visible 596 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 597 } else { 598 lastReplayed = replayEdit(secondaryRegion, entry); 599 } 600 } 601 602 // at this point, there should be some data (rows 0-200) in memstore snapshot 603 // and some more data in memstores (rows 200-300) 604 verifyData(secondaryRegion, 0, numRows, cq, families); 605 606 // no store files in the region 607 int expectedStoreFileCount = 0; 608 for (HStore s : secondaryRegion.getStores()) { 609 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 610 } 611 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 612 613 // Test case 1: replay the a flush commit marker smaller than what we have prepared 614 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 615 + startFlushDesc); 616 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 617 618 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 619 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 620 621 // assert that the flush files are picked 622 expectedStoreFileCount++; 623 for (HStore s : secondaryRegion.getStores()) { 624 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 625 } 626 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 627 MemStoreSize mss = store.getFlushableSize(); 628 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 629 630 // assert that the region memstore is same as before 631 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 632 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 633 634 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 635 636 LOG.info("-- Verifying edits from secondary"); 637 verifyData(secondaryRegion, 0, numRows, cq, families); 638 639 LOG.info("-- Verifying edits from primary."); 640 verifyData(primaryRegion, 0, numRows, cq, families); 641 } 642 643 /** 644 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 645 * larger than the previous flush start marker. 646 */ 647 @Test 648 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 649 // load some data to primary and flush. 1 flush and some more unflushed data 650 putDataWithFlushes(primaryRegion, 100, 100, 100); 651 int numRows = 200; 652 653 // now replay the edits and the flush marker 654 reader = createWALReaderForPrimary(); 655 656 LOG.info("-- Replaying edits and flush events in secondary"); 657 FlushDescriptor startFlushDesc = null; 658 FlushDescriptor commitFlushDesc = null; 659 660 int lastReplayed = 0; 661 while (true) { 662 WAL.Entry entry = reader.next(); 663 if (entry == null) { 664 break; 665 } 666 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 667 if (flushDesc != null) { 668 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 669 if (startFlushDesc == null) { 670 LOG.info("-- Replaying flush start in secondary"); 671 startFlushDesc = flushDesc; 672 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 673 assertNull(result.result); 674 } 675 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 676 // do not replay any flush commit yet 677 // hold on to the flush commit marker but simulate a larger 678 // flush commit seqId 679 commitFlushDesc = FlushDescriptor.newBuilder(flushDesc) 680 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build(); 681 } 682 // after replay verify that everything is still visible 683 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 684 } else { 685 lastReplayed = replayEdit(secondaryRegion, entry); 686 } 687 } 688 689 // at this point, there should be some data (rows 0-100) in memstore snapshot 690 // and some more data in memstores (rows 100-200) 691 verifyData(secondaryRegion, 0, numRows, cq, families); 692 693 // no store files in the region 694 int expectedStoreFileCount = 0; 695 for (HStore s : secondaryRegion.getStores()) { 696 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 697 } 698 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 699 700 // Test case 1: replay the a flush commit marker larger than what we have prepared 701 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 702 + startFlushDesc); 703 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 704 705 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 706 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 707 708 // assert that the flush files are picked 709 expectedStoreFileCount++; 710 for (HStore s : secondaryRegion.getStores()) { 711 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 712 } 713 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 714 MemStoreSize mss = store.getFlushableSize(); 715 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 716 717 // assert that the region memstore is smaller than before, but not empty 718 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 719 assertTrue(newRegionMemstoreSize > 0); 720 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 721 722 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 723 724 LOG.info("-- Verifying edits from secondary"); 725 verifyData(secondaryRegion, 0, numRows, cq, families); 726 727 LOG.info("-- Verifying edits from primary."); 728 verifyData(primaryRegion, 0, numRows, cq, families); 729 } 730 731 /** 732 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 733 * memstore edits should be dropped after the flush commit replay since they should be in flushed 734 * files 735 */ 736 @Test 737 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 738 throws IOException { 739 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 740 } 741 742 /** 743 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 744 * memstore edits should be not dropped after the flush commit replay since not every edit will be 745 * in flushed files (based on seqId) 746 */ 747 @Test 748 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 749 throws IOException { 750 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 751 } 752 753 /** 754 * Tests the case where we receive a flush commit before receiving any flush prepare markers 755 */ 756 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 757 throws IOException { 758 // load some data to primary and flush. 1 flushes and some more unflushed data. 759 // write more data after flush depending on whether droppableSnapshot 760 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 761 int numRows = droppableMemstore ? 100 : 200; 762 763 // now replay the edits and the flush marker 764 reader = createWALReaderForPrimary(); 765 766 LOG.info("-- Replaying edits and flush events in secondary"); 767 FlushDescriptor commitFlushDesc = null; 768 769 int lastReplayed = 0; 770 while (true) { 771 WAL.Entry entry = reader.next(); 772 if (entry == null) { 773 break; 774 } 775 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 776 if (flushDesc != null) { 777 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 778 // do not replay flush start marker 779 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 780 commitFlushDesc = flushDesc; // hold on to the flush commit marker 781 } 782 // after replay verify that everything is still visible 783 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 784 } else { 785 lastReplayed = replayEdit(secondaryRegion, entry); 786 } 787 } 788 789 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 790 // and some more data in memstores (rows 100-300) 791 verifyData(secondaryRegion, 0, numRows, cq, families); 792 793 // no store files in the region 794 int expectedStoreFileCount = 0; 795 for (HStore s : secondaryRegion.getStores()) { 796 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 797 } 798 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 799 800 // Test case 1: replay a flush commit marker without start flush marker 801 assertNull(secondaryRegion.getPrepareFlushResult()); 802 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 803 804 // ensure all files are visible in secondary 805 for (HStore store : secondaryRegion.getStores()) { 806 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 807 } 808 809 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 810 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 811 812 // assert that the flush files are picked 813 expectedStoreFileCount++; 814 for (HStore s : secondaryRegion.getStores()) { 815 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 816 } 817 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 818 MemStoreSize mss = store.getFlushableSize(); 819 if (droppableMemstore) { 820 // assert that the memstore is dropped 821 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 822 } else { 823 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 824 } 825 826 // assert that the region memstore is same as before (we could not drop) 827 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 828 if (droppableMemstore) { 829 assertTrue(0 == newRegionMemstoreSize); 830 } else { 831 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 832 } 833 834 LOG.info("-- Verifying edits from secondary"); 835 verifyData(secondaryRegion, 0, numRows, cq, families); 836 837 LOG.info("-- Verifying edits from primary."); 838 verifyData(primaryRegion, 0, numRows, cq, families); 839 } 840 841 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 842 return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build(); 843 } 844 845 /** 846 * Tests replaying region open markers from primary region. Checks whether the files are picked up 847 */ 848 @Test 849 public void testReplayRegionOpenEvent() throws IOException { 850 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 851 int numRows = 100; 852 853 // close the region and open again. 854 primaryRegion.close(); 855 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 856 857 // now replay the edits and the flush marker 858 reader = createWALReaderForPrimary(); 859 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 860 861 LOG.info("-- Replaying edits and region events in secondary"); 862 while (true) { 863 WAL.Entry entry = reader.next(); 864 if (entry == null) { 865 break; 866 } 867 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 868 RegionEventDescriptor regionEventDesc = 869 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 870 871 if (flushDesc != null) { 872 // don't replay flush events 873 } else if (regionEventDesc != null) { 874 regionEvents.add(regionEventDesc); 875 } else { 876 // don't replay edits 877 } 878 } 879 880 // we should have 1 open, 1 close and 1 open event 881 assertEquals(3, regionEvents.size()); 882 883 // replay the first region open event. 884 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 885 886 // replay the close event as well 887 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 888 889 // no store files in the region 890 int expectedStoreFileCount = 0; 891 for (HStore s : secondaryRegion.getStores()) { 892 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 893 } 894 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 895 assertTrue(regionMemstoreSize == 0); 896 897 // now replay the region open event that should contain new file locations 898 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 899 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 900 901 // assert that the flush files are picked 902 expectedStoreFileCount++; 903 for (HStore s : secondaryRegion.getStores()) { 904 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 905 } 906 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 907 MemStoreSize mss = store.getFlushableSize(); 908 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 909 910 // assert that the region memstore is empty 911 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 912 assertTrue(newRegionMemstoreSize == 0); 913 914 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 915 // any 916 917 LOG.info("-- Verifying edits from secondary"); 918 verifyData(secondaryRegion, 0, numRows, cq, families); 919 920 LOG.info("-- Verifying edits from primary."); 921 verifyData(primaryRegion, 0, numRows, cq, families); 922 } 923 924 /** 925 * Tests the case where we replay a region open event after a flush start but before receiving 926 * flush commit 927 */ 928 @Test 929 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 930 putDataWithFlushes(primaryRegion, 100, 100, 100); 931 int numRows = 200; 932 933 // close the region and open again. 934 primaryRegion.close(); 935 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 936 937 // now replay the edits and the flush marker 938 reader = createWALReaderForPrimary(); 939 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 940 941 LOG.info("-- Replaying edits and region events in secondary"); 942 while (true) { 943 WAL.Entry entry = reader.next(); 944 if (entry == null) { 945 break; 946 } 947 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 948 RegionEventDescriptor regionEventDesc = 949 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 950 951 if (flushDesc != null) { 952 // only replay flush start 953 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 954 secondaryRegion.replayWALFlushStartMarker(flushDesc); 955 } 956 } else if (regionEventDesc != null) { 957 regionEvents.add(regionEventDesc); 958 } else { 959 replayEdit(secondaryRegion, entry); 960 } 961 } 962 963 // at this point, there should be some data (rows 0-100) in the memstore snapshot 964 // and some more data in memstores (rows 100-200) 965 verifyData(secondaryRegion, 0, numRows, cq, families); 966 967 // we should have 1 open, 1 close and 1 open event 968 assertEquals(3, regionEvents.size()); 969 970 // no store files in the region 971 int expectedStoreFileCount = 0; 972 for (HStore s : secondaryRegion.getStores()) { 973 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 974 } 975 976 // now replay the region open event that should contain new file locations 977 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 978 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 979 980 // assert that the flush files are picked 981 expectedStoreFileCount = 2; // two flushes happened 982 for (HStore s : secondaryRegion.getStores()) { 983 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 984 } 985 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 986 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 987 assertTrue(newSnapshotSize.getDataSize() == 0); 988 989 // assert that the region memstore is empty 990 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 991 assertTrue(newRegionMemstoreSize == 0); 992 993 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 994 // any 995 996 LOG.info("-- Verifying edits from secondary"); 997 verifyData(secondaryRegion, 0, numRows, cq, families); 998 999 LOG.info("-- Verifying edits from primary."); 1000 verifyData(primaryRegion, 0, numRows, cq, families); 1001 } 1002 1003 /** 1004 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1005 * of the last replayed region open event. 1006 */ 1007 @Test 1008 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1009 putDataWithFlushes(primaryRegion, 100, 100, 0); 1010 int numRows = 100; 1011 1012 // close the region and open again. 1013 primaryRegion.close(); 1014 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1015 1016 // now replay the edits and the flush marker 1017 reader = createWALReaderForPrimary(); 1018 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1019 List<WAL.Entry> edits = Lists.newArrayList(); 1020 1021 LOG.info("-- Replaying edits and region events in secondary"); 1022 while (true) { 1023 WAL.Entry entry = reader.next(); 1024 if (entry == null) { 1025 break; 1026 } 1027 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1028 RegionEventDescriptor regionEventDesc = 1029 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1030 1031 if (flushDesc != null) { 1032 // don't replay flushes 1033 } else if (regionEventDesc != null) { 1034 regionEvents.add(regionEventDesc); 1035 } else { 1036 edits.add(entry); 1037 } 1038 } 1039 1040 // replay the region open of first open, but with the seqid of the second open 1041 // this way non of the flush files will be picked up. 1042 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0)) 1043 .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build()); 1044 1045 // replay edits from the before region close. If replay does not 1046 // skip these the following verification will NOT fail. 1047 for (WAL.Entry entry : edits) { 1048 replayEdit(secondaryRegion, entry); 1049 } 1050 1051 boolean expectedFail = false; 1052 try { 1053 verifyData(secondaryRegion, 0, numRows, cq, families); 1054 } catch (AssertionError e) { 1055 expectedFail = true; // expected 1056 } 1057 if (!expectedFail) { 1058 fail("Should have failed this verification"); 1059 } 1060 } 1061 1062 @Test 1063 public void testReplayFlushSeqIds() throws IOException { 1064 // load some data to primary and flush 1065 int start = 0; 1066 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 1067 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1068 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1069 primaryRegion.flush(true); 1070 1071 // now replay the flush marker 1072 reader = createWALReaderForPrimary(); 1073 1074 long flushSeqId = -1; 1075 LOG.info("-- Replaying flush events in secondary"); 1076 while (true) { 1077 WAL.Entry entry = reader.next(); 1078 if (entry == null) { 1079 break; 1080 } 1081 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1082 if (flushDesc != null) { 1083 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1084 LOG.info("-- Replaying flush start in secondary"); 1085 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1086 flushSeqId = flushDesc.getFlushSequenceNumber(); 1087 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1088 LOG.info("-- Replaying flush commit in secondary"); 1089 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1090 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1091 } 1092 } 1093 // else do not replay 1094 } 1095 1096 // TODO: what to do with this? 1097 // assert that the newly picked up flush file is visible 1098 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1099 assertEquals(flushSeqId, readPoint); 1100 1101 // after replay verify that everything is still visible 1102 verifyData(secondaryRegion, 0, 100, cq, families); 1103 } 1104 1105 @Test 1106 public void testSeqIdsFromReplay() throws IOException { 1107 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1108 // original seqIds and they are made visible through mvcc read point upon replay 1109 String method = name.getMethodName(); 1110 byte[] tableName = Bytes.toBytes(method); 1111 byte[] family = Bytes.toBytes("family"); 1112 1113 HRegion region = initHRegion(tableName, method, family); 1114 try { 1115 // replay an entry that is bigger than current read point 1116 long readPoint = region.getMVCC().getReadPoint(); 1117 long origSeqId = readPoint + 100; 1118 1119 Put put = new Put(row).addColumn(family, row, row); 1120 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1121 replay(region, put, origSeqId); 1122 1123 // read point should have advanced to this seqId 1124 assertGet(region, family, row); 1125 1126 // region seqId should have advanced at least to this seqId 1127 assertEquals(origSeqId, region.getReadPoint(null)); 1128 1129 // replay an entry that is smaller than current read point 1130 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1131 // replay does not allow reads while replay is going on. 1132 put = new Put(row2).addColumn(family, row2, row2); 1133 put.setDurability(Durability.SKIP_WAL); 1134 replay(region, put, origSeqId - 50); 1135 1136 assertGet(region, family, row2); 1137 } finally { 1138 region.close(); 1139 } 1140 } 1141 1142 /** 1143 * Tests that a region opened in secondary mode would not write region open / close events to its 1144 * WAL. 1145 */ 1146 @Test 1147 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1148 secondaryRegion.close(); 1149 walSecondary = spy(walSecondary); 1150 1151 // test for region open and close 1152 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1153 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1154 any(WALEdit.class)); 1155 1156 // test for replay prepare flush 1157 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1158 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder() 1159 .setFlushSequenceNumber(10) 1160 .setTableName(UnsafeByteOperations 1161 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1162 .setAction(FlushAction.START_FLUSH) 1163 .setEncodedRegionName( 1164 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1165 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1166 .build()); 1167 1168 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1169 any(WALEdit.class)); 1170 1171 secondaryRegion.close(); 1172 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1173 any(WALEdit.class)); 1174 } 1175 1176 /** 1177 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1178 */ 1179 @Test 1180 public void testRegionReadsEnabledFlag() throws IOException { 1181 1182 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1183 1184 verifyData(secondaryRegion, 0, 100, cq, families); 1185 1186 // now disable reads 1187 secondaryRegion.setReadsEnabled(false); 1188 try { 1189 verifyData(secondaryRegion, 0, 100, cq, families); 1190 fail("Should have failed with IOException"); 1191 } catch (IOException ex) { 1192 // expected 1193 } 1194 1195 // verify that we can still replay data 1196 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1197 1198 // now enable reads again 1199 secondaryRegion.setReadsEnabled(true); 1200 verifyData(secondaryRegion, 0, 200, cq, families); 1201 } 1202 1203 /** 1204 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1205 * It should write the flush request marker instead. 1206 */ 1207 @Test 1208 public void testWriteFlushRequestMarker() throws IOException { 1209 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1210 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1211 assertNotNull(result); 1212 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1213 assertFalse(result.wroteFlushWalMarker); 1214 1215 // request flush again, but this time with writeFlushRequestWalMarker = true 1216 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1217 assertNotNull(result); 1218 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1219 assertTrue(result.wroteFlushWalMarker); 1220 1221 List<FlushDescriptor> flushes = Lists.newArrayList(); 1222 reader = createWALReaderForPrimary(); 1223 while (true) { 1224 WAL.Entry entry = reader.next(); 1225 if (entry == null) { 1226 break; 1227 } 1228 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1229 if (flush != null) { 1230 flushes.add(flush); 1231 } 1232 } 1233 1234 assertEquals(1, flushes.size()); 1235 assertNotNull(flushes.get(0)); 1236 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1237 } 1238 1239 /** 1240 * Test the case where the secondary region replica is not in reads enabled state because it is 1241 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush 1242 * marker entry should restore the reads enabled status in the region and allow the reads to 1243 * continue. 1244 */ 1245 @Test 1246 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1247 disableReads(secondaryRegion); 1248 1249 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1250 // triggered flush restores readsEnabled 1251 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1252 reader = createWALReaderForPrimary(); 1253 while (true) { 1254 WAL.Entry entry = reader.next(); 1255 if (entry == null) { 1256 break; 1257 } 1258 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1259 if (flush != null) { 1260 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1261 } 1262 } 1263 1264 // now reads should be enabled 1265 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1266 } 1267 1268 /** 1269 * Test the case where the secondary region replica is not in reads enabled state because it is 1270 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1271 * entries should restore the reads enabled status in the region and allow the reads to continue. 1272 */ 1273 @Test 1274 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1275 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1276 // from triggered flush restores readsEnabled 1277 disableReads(secondaryRegion); 1278 1279 // put some data in primary 1280 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1281 primaryRegion.flush(true); 1282 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1283 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1284 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1285 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1286 // but can't figure it... and this is only test that seems to suffer this flush issue. 1287 // St.Ack 20160201 1288 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1289 1290 reader = createWALReaderForPrimary(); 1291 while (true) { 1292 WAL.Entry entry = reader.next(); 1293 LOG.info(Objects.toString(entry)); 1294 if (entry == null) { 1295 break; 1296 } 1297 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1298 if (flush != null) { 1299 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1300 } else { 1301 replayEdit(secondaryRegion, entry); 1302 } 1303 } 1304 1305 // now reads should be enabled 1306 verifyData(secondaryRegion, 0, 100, cq, families); 1307 } 1308 1309 /** 1310 * Test the case where the secondary region replica is not in reads enabled state because it is 1311 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1312 * entries should restore the reads enabled status in the region and allow the reads to continue. 1313 */ 1314 @Test 1315 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1316 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1317 // from triggered flush restores readsEnabled 1318 disableReads(secondaryRegion); 1319 1320 // put some data in primary 1321 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1322 primaryRegion.flush(true); 1323 1324 reader = createWALReaderForPrimary(); 1325 while (true) { 1326 WAL.Entry entry = reader.next(); 1327 if (entry == null) { 1328 break; 1329 } 1330 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1331 if (flush != null) { 1332 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1333 } 1334 } 1335 1336 // now reads should be enabled 1337 verifyData(secondaryRegion, 0, 100, cq, families); 1338 } 1339 1340 /** 1341 * Test the case where the secondary region replica is not in reads enabled state because it is 1342 * waiting for a flush or region open marker from primary region. Replaying region open event 1343 * entry from primary should restore the reads enabled status in the region and allow the reads to 1344 * continue. 1345 */ 1346 @Test 1347 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1348 // Test case 3: Test that replaying region open event markers restores readsEnabled 1349 disableReads(secondaryRegion); 1350 1351 primaryRegion.close(); 1352 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1353 1354 reader = createWALReaderForPrimary(); 1355 while (true) { 1356 WAL.Entry entry = reader.next(); 1357 if (entry == null) { 1358 break; 1359 } 1360 1361 RegionEventDescriptor regionEventDesc = 1362 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1363 1364 if (regionEventDesc != null) { 1365 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1366 } 1367 } 1368 1369 // now reads should be enabled 1370 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1371 } 1372 1373 @Test 1374 public void testRefresStoreFiles() throws IOException { 1375 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1376 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1377 1378 // Test case 1: refresh with an empty region 1379 secondaryRegion.refreshStoreFiles(); 1380 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1381 1382 // do one flush 1383 putDataWithFlushes(primaryRegion, 100, 100, 0); 1384 int numRows = 100; 1385 1386 // refresh the store file list, and ensure that the files are picked up. 1387 secondaryRegion.refreshStoreFiles(); 1388 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1389 secondaryRegion.getStoreFileList(families)); 1390 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1391 1392 LOG.info("-- Verifying edits from secondary"); 1393 verifyData(secondaryRegion, 0, numRows, cq, families); 1394 1395 // Test case 2: 3 some more flushes 1396 putDataWithFlushes(primaryRegion, 100, 300, 0); 1397 numRows = 300; 1398 1399 // refresh the store file list, and ensure that the files are picked up. 1400 secondaryRegion.refreshStoreFiles(); 1401 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1402 secondaryRegion.getStoreFileList(families)); 1403 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1404 1405 LOG.info("-- Verifying edits from secondary"); 1406 verifyData(secondaryRegion, 0, numRows, cq, families); 1407 1408 if (FSUtils.WINDOWS) { 1409 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1410 return; 1411 } 1412 1413 // Test case 3: compact primary files 1414 primaryRegion.compactStores(); 1415 List<HRegion> regions = new ArrayList<>(); 1416 regions.add(primaryRegion); 1417 Mockito.doReturn(regions).when(rss).getRegions(); 1418 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1419 cleaner.chore(); 1420 secondaryRegion.refreshStoreFiles(); 1421 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1422 secondaryRegion.getStoreFileList(families)); 1423 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1424 1425 LOG.info("-- Verifying edits from secondary"); 1426 verifyData(secondaryRegion, 0, numRows, cq, families); 1427 1428 LOG.info("-- Replaying edits in secondary"); 1429 1430 // Test case 4: replay some edits, ensure that memstore is dropped. 1431 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1432 putDataWithFlushes(primaryRegion, 400, 400, 0); 1433 numRows = 400; 1434 1435 reader = createWALReaderForPrimary(); 1436 while (true) { 1437 WAL.Entry entry = reader.next(); 1438 if (entry == null) { 1439 break; 1440 } 1441 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1442 if (flush != null) { 1443 // do not replay flush 1444 } else { 1445 replayEdit(secondaryRegion, entry); 1446 } 1447 } 1448 1449 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1450 1451 secondaryRegion.refreshStoreFiles(); 1452 1453 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1454 1455 LOG.info("-- Verifying edits from primary"); 1456 verifyData(primaryRegion, 0, numRows, cq, families); 1457 LOG.info("-- Verifying edits from secondary"); 1458 verifyData(secondaryRegion, 0, numRows, cq, families); 1459 } 1460 1461 /** 1462 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1463 */ 1464 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1465 List<Path> l1 = new ArrayList<>(list1.size()); 1466 for (String path : list1) { 1467 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1468 } 1469 List<Path> l2 = new ArrayList<>(list2.size()); 1470 for (String path : list2) { 1471 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1472 } 1473 assertEquals(l1, l2); 1474 } 1475 1476 private void disableReads(HRegion region) { 1477 region.setReadsEnabled(false); 1478 try { 1479 verifyData(region, 0, 1, cq, families); 1480 fail("Should have failed with IOException"); 1481 } catch (IOException ex) { 1482 // expected 1483 } 1484 } 1485 1486 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1487 put.setDurability(Durability.SKIP_WAL); 1488 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1489 region.batchReplay(new MutationReplay[] { mutation }, replaySeqId); 1490 } 1491 1492 /** 1493 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1494 */ 1495 @Test 1496 public void testReplayBulkLoadEvent() throws IOException { 1497 LOG.info("testReplayBulkLoadEvent starts"); 1498 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1499 1500 // close the region and open again. 1501 primaryRegion.close(); 1502 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1503 1504 // bulk load a file into primary region 1505 byte[] randomValues = new byte[20]; 1506 Bytes.random(randomValues); 1507 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1508 1509 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1510 int expectedLoadFileCount = 0; 1511 for (byte[] family : families) { 1512 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1513 expectedLoadFileCount++; 1514 } 1515 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1516 1517 // now replay the edits and the bulk load marker 1518 reader = createWALReaderForPrimary(); 1519 1520 LOG.info("-- Replaying edits and region events in secondary"); 1521 BulkLoadDescriptor bulkloadEvent = null; 1522 while (true) { 1523 WAL.Entry entry = reader.next(); 1524 if (entry == null) { 1525 break; 1526 } 1527 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1528 if (bulkloadEvent != null) { 1529 break; 1530 } 1531 } 1532 1533 // we should have 1 bulk load event 1534 assertTrue(bulkloadEvent != null); 1535 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1536 1537 // replay the bulk load event 1538 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1539 1540 List<String> storeFileName = new ArrayList<>(); 1541 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1542 storeFileName.addAll(storeDesc.getStoreFileList()); 1543 } 1544 // assert that the bulk loaded files are picked 1545 for (HStore s : secondaryRegion.getStores()) { 1546 for (HStoreFile sf : s.getStorefiles()) { 1547 storeFileName.remove(sf.getPath().getName()); 1548 } 1549 } 1550 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1551 1552 LOG.info("-- Verifying edits from secondary"); 1553 for (byte[] family : families) { 1554 assertGet(secondaryRegion, family, randomValues); 1555 } 1556 } 1557 1558 @Test 1559 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1560 // tests replaying flush commit marker, but the flush file has already been compacted 1561 // from primary and also deleted from the archive directory 1562 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder() 1563 .setFlushSequenceNumber(Long.MAX_VALUE) 1564 .setTableName(UnsafeByteOperations 1565 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1566 .setAction(FlushAction.COMMIT_FLUSH) 1567 .setEncodedRegionName( 1568 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1569 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1570 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1571 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1572 .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build()) 1573 .build()); 1574 } 1575 1576 @Test 1577 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1578 // tests replaying compaction marker, but the compaction output file has already been compacted 1579 // from primary and also deleted from the archive directory 1580 secondaryRegion 1581 .replayWALCompactionMarker( 1582 CompactionDescriptor.newBuilder() 1583 .setTableName(UnsafeByteOperations 1584 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1585 .setEncodedRegionName( 1586 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1587 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123") 1588 .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir") 1589 .setRegionName( 1590 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1591 .build(), 1592 true, true, Long.MAX_VALUE); 1593 } 1594 1595 @Test 1596 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1597 // tests replaying region open event marker, but the region files have already been compacted 1598 // from primary and also deleted from the archive directory 1599 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1600 .setTableName(UnsafeByteOperations 1601 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1602 .setEncodedRegionName( 1603 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1604 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1605 .setEventType(EventType.REGION_OPEN) 1606 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1607 .setLogSequenceNumber(Long.MAX_VALUE) 1608 .addStores( 1609 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1610 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1611 .build()); 1612 } 1613 1614 @Test 1615 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1616 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1617 // from primary and also deleted from the archive directory 1618 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1619 .setTableName( 1620 ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1621 .setEncodedRegionName( 1622 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1623 .setBulkloadSeqNum(Long.MAX_VALUE) 1624 .addStores( 1625 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1626 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1627 .build()); 1628 } 1629 1630 private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes) 1631 throws IOException { 1632 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1633 // TODO We need a way to do this without creating files 1634 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1635 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1636 try { 1637 hFileFactory.withOutputStream(out); 1638 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1639 HFile.Writer writer = hFileFactory.create(); 1640 try { 1641 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L, 1642 KeyValue.Type.Put.getCode(), valueBytes))); 1643 } finally { 1644 writer.close(); 1645 } 1646 } finally { 1647 out.close(); 1648 } 1649 return testFile.toString(); 1650 } 1651 1652 /** 1653 * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush 1654 * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but 1655 * does not execute flush after 1656 */ 1657 private void putDataWithFlushes(HRegion region, int flushInterval, int numRows, 1658 int numRowsAfterFlush) throws IOException { 1659 int start = 0; 1660 for (; start < numRows; start += flushInterval) { 1661 LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval)); 1662 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1663 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1664 region.flush(true); 1665 } 1666 LOG.info("-- Writing some more data to primary, not flushing"); 1667 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1668 } 1669 1670 private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf, 1671 byte[]... families) throws IOException { 1672 for (int i = startRow; i < startRow + numRows; i++) { 1673 Put put = new Put(Bytes.toBytes("" + i)); 1674 put.setDurability(Durability.SKIP_WAL); 1675 for (byte[] family : families) { 1676 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1677 } 1678 replay(region, put, i + 1); 1679 } 1680 } 1681 1682 private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) 1683 throws IOException { 1684 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1685 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); 1686 } 1687 1688 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, 1689 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1690 byte[]... families) throws IOException { 1691 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, 1692 isReadOnly, durability, wal, families); 1693 } 1694}