001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Get;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.RegionInfoBuilder;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.executor.ExecutorService;
062import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
063import org.apache.hadoop.hbase.executor.ExecutorType;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
068import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
073import org.apache.hadoop.hbase.util.FSUtils;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
076import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.apache.hadoop.hbase.wal.WALEdit;
079import org.apache.hadoop.hbase.wal.WALFactory;
080import org.apache.hadoop.hbase.wal.WALKeyImpl;
081import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
082import org.apache.hadoop.hbase.wal.WALStreamReader;
083import org.apache.hadoop.util.StringUtils;
084import org.junit.After;
085import org.junit.AfterClass;
086import org.junit.Before;
087import org.junit.BeforeClass;
088import org.junit.ClassRule;
089import org.junit.Rule;
090import org.junit.Test;
091import org.junit.experimental.categories.Category;
092import org.junit.rules.TestName;
093import org.mockito.Mockito;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
098import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
110
111/**
112 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
113 * region replicas
114 */
115@Category(LargeTests.class)
116public class TestHRegionReplayEvents {
117
118  @ClassRule
119  public static final HBaseClassTestRule CLASS_RULE =
120    HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
121
122  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
123  @Rule
124  public TestName name = new TestName();
125
126  private static HBaseTestingUtility TEST_UTIL;
127
128  public static Configuration CONF;
129  private String dir;
130
131  private byte[][] families =
132    new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") };
133
134  // Test names
135  protected byte[] tableName;
136  protected String method;
137  protected final byte[] row = Bytes.toBytes("rowA");
138  protected final byte[] row2 = Bytes.toBytes("rowB");
139  protected byte[] cq = Bytes.toBytes("cq");
140
141  // per test fields
142  private Path rootDir;
143  private TableDescriptor htd;
144  private RegionServerServices rss;
145  private RegionInfo primaryHri, secondaryHri;
146  private HRegion primaryRegion, secondaryRegion;
147  private WAL walPrimary, walSecondary;
148  private WALStreamReader reader;
149
150  @BeforeClass
151  public static void setUpBeforeClass() throws Exception {
152    TEST_UTIL = new HBaseTestingUtility();
153    TEST_UTIL.startMiniDFSCluster(1);
154  }
155
156  @AfterClass
157  public static void tearDownAfterClass() throws Exception {
158    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
159    TEST_UTIL.cleanupTestDir();
160    TEST_UTIL.shutdownMiniDFSCluster();
161  }
162
163  @Before
164  public void setUp() throws Exception {
165    CONF = TEST_UTIL.getConfiguration();
166    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
167    method = name.getMethodName();
168    tableName = Bytes.toBytes(name.getMethodName());
169    rootDir = new Path(dir + method);
170    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
171    method = name.getMethodName();
172    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
173    for (byte[] family : families) {
174      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
175    }
176    htd = builder.build();
177
178    long time = EnvironmentEdgeManager.currentTime();
179    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
180      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
181    primaryHri =
182      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
183    secondaryHri =
184      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
185
186    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
187    walPrimary = wals.getWAL(primaryHri);
188    walSecondary = wals.getWAL(secondaryHri);
189
190    rss = mock(RegionServerServices.class);
191    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
192    when(rss.getConfiguration()).thenReturn(CONF);
193    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
194    String string =
195      org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString();
196    ExecutorService es = new ExecutorService(string);
197    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1)
198      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
199    when(rss.getExecutorService()).thenReturn(es);
200    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
201    primaryRegion.close();
202    List<HRegion> regions = new ArrayList<>();
203    regions.add(primaryRegion);
204    Mockito.doReturn(regions).when(rss).getRegions();
205
206    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
207    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
208
209    reader = null;
210  }
211
212  @After
213  public void tearDown() throws Exception {
214    if (reader != null) {
215      reader.close();
216    }
217
218    if (primaryRegion != null) {
219      HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
220    }
221    if (secondaryRegion != null) {
222      HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
223    }
224
225    EnvironmentEdgeManagerTestHelper.reset();
226  }
227
228  String getName() {
229    return name.getMethodName();
230  }
231
232  // Some of the test cases are as follows:
233  // 1. replay flush start marker again
234  // 2. replay flush with smaller seqId than what is there in memstore snapshot
235  // 3. replay flush with larger seqId than what is there in memstore snapshot
236  // 4. replay flush commit without flush prepare. non droppable memstore
237  // 5. replay flush commit without flush prepare. droppable memstore
238  // 6. replay open region event
239  // 7. replay open region event after flush start
240  // 8. replay flush form an earlier seqId (test ignoring seqIds)
241  // 9. start flush does not prevent region from closing.
242
243  @Test
244  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
245    // load some data and flush ensure that the secondary replica will not execute the flush
246
247    // load some data to secondary by replaying
248    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
249
250    verifyData(secondaryRegion, 0, 1000, cq, families);
251
252    // flush region
253    FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true);
254    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
255
256    verifyData(secondaryRegion, 0, 1000, cq, families);
257
258    // close the region, and inspect that it has not flushed
259    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
260    // assert that there are no files (due to flush)
261    for (List<HStoreFile> f : files.values()) {
262      assertTrue(f.isEmpty());
263    }
264  }
265
266  /**
267   * Tests a case where we replay only a flush start marker, then the region is closed. This region
268   * should not block indefinitely
269   */
270  @Test
271  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
272    // load some data to primary and flush
273    int start = 0;
274    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
275    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
276    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
277    primaryRegion.flush(true);
278
279    // now replay the edits and the flush marker
280    reader = createWALReaderForPrimary();
281
282    LOG.info("-- Replaying edits and flush events in secondary");
283    while (true) {
284      WAL.Entry entry = reader.next();
285      if (entry == null) {
286        break;
287      }
288      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
289      if (flushDesc != null) {
290        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
291          LOG.info("-- Replaying flush start in secondary");
292          secondaryRegion.replayWALFlushStartMarker(flushDesc);
293        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
294          LOG.info("-- NOT Replaying flush commit in secondary");
295        }
296      } else {
297        replayEdit(secondaryRegion, entry);
298      }
299    }
300
301    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
302    // now close the region which should not cause hold because of un-committed flush
303    secondaryRegion.close();
304
305    // verify that the memstore size is back to what it was
306    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
307  }
308
309  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
310    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
311      return 0; // handled elsewhere
312    }
313    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
314    for (Cell cell : entry.getEdit().getCells())
315      put.add(cell);
316    put.setDurability(Durability.SKIP_WAL);
317    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
318    region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId());
319    return Integer.parseInt(Bytes.toString(put.getRow()));
320  }
321
322  private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
323    return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
324      AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
325  }
326
327  @Test
328  public void testBatchReplayWithMultipleNonces() throws IOException {
329    try {
330      MutationReplay[] mutations = new MutationReplay[100];
331      for (int i = 0; i < 100; i++) {
332        Put put = new Put(Bytes.toBytes(i));
333        put.setDurability(Durability.SYNC_WAL);
334        for (byte[] familly : this.families) {
335          put.addColumn(familly, this.cq, null);
336          long nonceNum = i / 10;
337          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
338        }
339      }
340      primaryRegion.batchReplay(mutations, 20);
341    } catch (Exception e) {
342      String msg = "Error while replay of batch with multiple nonces. ";
343      LOG.error(msg, e);
344      fail(msg + e.getMessage());
345    }
346  }
347
348  @Test
349  public void testReplayFlushesAndCompactions() throws IOException {
350    // initiate a secondary region with some data.
351
352    // load some data to primary and flush. 3 flushes and some more unflushed data
353    putDataWithFlushes(primaryRegion, 100, 300, 100);
354
355    // compaction from primary
356    LOG.info("-- Compacting primary, only 1 store");
357    primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE);
358
359    // now replay the edits and the flush marker
360    reader = createWALReaderForPrimary();
361
362    LOG.info("-- Replaying edits and flush events in secondary");
363    int lastReplayed = 0;
364    int expectedStoreFileCount = 0;
365    while (true) {
366      WAL.Entry entry = reader.next();
367      if (entry == null) {
368        break;
369      }
370      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
371      CompactionDescriptor compactionDesc =
372        WALEdit.getCompaction(entry.getEdit().getCells().get(0));
373      if (flushDesc != null) {
374        // first verify that everything is replayed and visible before flush event replay
375        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
376        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
377        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
378        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
379        MemStoreSize mss = store.getFlushableSize();
380        long storeSize = store.getSize();
381        long storeSizeUncompressed = store.getStoreSizeUncompressed();
382        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
383          LOG.info("-- Replaying flush start in secondary");
384          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
385          assertNull(result.result);
386          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
387
388          // assert that the store memstore is smaller now
389          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
390          LOG.info("Memstore size reduced by:"
391            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
392          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
393
394        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
395          LOG.info("-- Replaying flush commit in secondary");
396          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
397
398          // assert that the flush files are picked
399          expectedStoreFileCount++;
400          for (HStore s : secondaryRegion.getStores()) {
401            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
402          }
403          MemStoreSize newMss = store.getFlushableSize();
404          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
405
406          // assert that the region memstore is smaller now
407          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
408          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
409
410          // assert that the store sizes are bigger
411          assertTrue(store.getSize() > storeSize);
412          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
413          assertEquals(store.getSize(), store.getStorefilesSize());
414        }
415        // after replay verify that everything is still visible
416        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
417      } else if (compactionDesc != null) {
418        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
419
420        // assert that the compaction is applied
421        for (HStore store : secondaryRegion.getStores()) {
422          if (store.getColumnFamilyName().equals("cf1")) {
423            assertEquals(1, store.getStorefilesCount());
424          } else {
425            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
426          }
427        }
428      } else {
429        lastReplayed = replayEdit(secondaryRegion, entry);
430        ;
431      }
432    }
433
434    assertEquals(400 - 1, lastReplayed);
435    LOG.info("-- Verifying edits from secondary");
436    verifyData(secondaryRegion, 0, 400, cq, families);
437
438    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
439    verifyData(primaryRegion, 0, lastReplayed, cq, families);
440    for (HStore store : primaryRegion.getStores()) {
441      if (store.getColumnFamilyName().equals("cf1")) {
442        assertEquals(1, store.getStorefilesCount());
443      } else {
444        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
445      }
446    }
447  }
448
449  /**
450   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
451   * equal to, greater or less than the previous flush start marker.
452   */
453  @Test
454  public void testReplayFlushStartMarkers() throws IOException {
455    // load some data to primary and flush. 1 flush and some more unflushed data
456    putDataWithFlushes(primaryRegion, 100, 100, 100);
457    int numRows = 200;
458
459    // now replay the edits and the flush marker
460    reader = createWALReaderForPrimary();
461
462    LOG.info("-- Replaying edits and flush events in secondary");
463
464    FlushDescriptor startFlushDesc = null;
465
466    int lastReplayed = 0;
467    while (true) {
468      WAL.Entry entry = reader.next();
469      if (entry == null) {
470        break;
471      }
472      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
473      if (flushDesc != null) {
474        // first verify that everything is replayed and visible before flush event replay
475        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
476        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
477        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
478        MemStoreSize mss = store.getFlushableSize();
479
480        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
481          startFlushDesc = flushDesc;
482          LOG.info("-- Replaying flush start in secondary");
483          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
484          assertNull(result.result);
485          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
486          assertTrue(regionMemstoreSize > 0);
487          assertTrue(mss.getHeapSize() > 0);
488
489          // assert that the store memstore is smaller now
490          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
491          LOG.info("Memstore size reduced by:"
492            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
493          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
494          verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
495
496        }
497        // after replay verify that everything is still visible
498        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
499      } else {
500        lastReplayed = replayEdit(secondaryRegion, entry);
501      }
502    }
503
504    // at this point, there should be some data (rows 0-100) in memstore snapshot
505    // and some more data in memstores (rows 100-200)
506
507    verifyData(secondaryRegion, 0, numRows, cq, families);
508
509    // Test case 1: replay the same flush start marker again
510    LOG.info("-- Replaying same flush start in secondary again");
511    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
512    assertNull(result); // this should return null. Ignoring the flush start marker
513    // assert that we still have prepared flush with the previous setup.
514    assertNotNull(secondaryRegion.getPrepareFlushResult());
515    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
516      startFlushDesc.getFlushSequenceNumber());
517    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
518    verifyData(secondaryRegion, 0, numRows, cq, families);
519
520    // Test case 2: replay a flush start marker with a smaller seqId
521    FlushDescriptor startFlushDescSmallerSeqId =
522      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
523    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
524    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
525    assertNull(result); // this should return null. Ignoring the flush start marker
526    // assert that we still have prepared flush with the previous setup.
527    assertNotNull(secondaryRegion.getPrepareFlushResult());
528    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
529      startFlushDesc.getFlushSequenceNumber());
530    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
531    verifyData(secondaryRegion, 0, numRows, cq, families);
532
533    // Test case 3: replay a flush start marker with a larger seqId
534    FlushDescriptor startFlushDescLargerSeqId =
535      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
536    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
537    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
538    assertNull(result); // this should return null. Ignoring the flush start marker
539    // assert that we still have prepared flush with the previous setup.
540    assertNotNull(secondaryRegion.getPrepareFlushResult());
541    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
542      startFlushDesc.getFlushSequenceNumber());
543    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
544    verifyData(secondaryRegion, 0, numRows, cq, families);
545
546    LOG.info("-- Verifying edits from secondary");
547    verifyData(secondaryRegion, 0, numRows, cq, families);
548
549    LOG.info("-- Verifying edits from primary.");
550    verifyData(primaryRegion, 0, numRows, cq, families);
551  }
552
553  /**
554   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
555   * less than the previous flush start marker.
556   */
557  @Test
558  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
559    // load some data to primary and flush. 2 flushes and some more unflushed data
560    putDataWithFlushes(primaryRegion, 100, 200, 100);
561    int numRows = 300;
562
563    // now replay the edits and the flush marker
564    reader = createWALReaderForPrimary();
565
566    LOG.info("-- Replaying edits and flush events in secondary");
567    FlushDescriptor startFlushDesc = null;
568    FlushDescriptor commitFlushDesc = null;
569
570    int lastReplayed = 0;
571    while (true) {
572      System.out.println(lastReplayed);
573      WAL.Entry entry = reader.next();
574      if (entry == null) {
575        break;
576      }
577      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
578      if (flushDesc != null) {
579        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
580          // don't replay the first flush start marker, hold on to it, replay the second one
581          if (startFlushDesc == null) {
582            startFlushDesc = flushDesc;
583          } else {
584            LOG.info("-- Replaying flush start in secondary");
585            startFlushDesc = flushDesc;
586            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
587            assertNull(result.result);
588          }
589        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
590          // do not replay any flush commit yet
591          if (commitFlushDesc == null) {
592            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
593          }
594        }
595        // after replay verify that everything is still visible
596        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
597      } else {
598        lastReplayed = replayEdit(secondaryRegion, entry);
599      }
600    }
601
602    // at this point, there should be some data (rows 0-200) in memstore snapshot
603    // and some more data in memstores (rows 200-300)
604    verifyData(secondaryRegion, 0, numRows, cq, families);
605
606    // no store files in the region
607    int expectedStoreFileCount = 0;
608    for (HStore s : secondaryRegion.getStores()) {
609      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
610    }
611    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
612
613    // Test case 1: replay the a flush commit marker smaller than what we have prepared
614    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
615      + startFlushDesc);
616    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
617
618    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
619    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
620
621    // assert that the flush files are picked
622    expectedStoreFileCount++;
623    for (HStore s : secondaryRegion.getStores()) {
624      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
625    }
626    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
627    MemStoreSize mss = store.getFlushableSize();
628    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
629
630    // assert that the region memstore is same as before
631    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
632    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
633
634    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
635
636    LOG.info("-- Verifying edits from secondary");
637    verifyData(secondaryRegion, 0, numRows, cq, families);
638
639    LOG.info("-- Verifying edits from primary.");
640    verifyData(primaryRegion, 0, numRows, cq, families);
641  }
642
643  /**
644   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
645   * larger than the previous flush start marker.
646   */
647  @Test
648  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
649    // load some data to primary and flush. 1 flush and some more unflushed data
650    putDataWithFlushes(primaryRegion, 100, 100, 100);
651    int numRows = 200;
652
653    // now replay the edits and the flush marker
654    reader = createWALReaderForPrimary();
655
656    LOG.info("-- Replaying edits and flush events in secondary");
657    FlushDescriptor startFlushDesc = null;
658    FlushDescriptor commitFlushDesc = null;
659
660    int lastReplayed = 0;
661    while (true) {
662      WAL.Entry entry = reader.next();
663      if (entry == null) {
664        break;
665      }
666      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
667      if (flushDesc != null) {
668        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
669          if (startFlushDesc == null) {
670            LOG.info("-- Replaying flush start in secondary");
671            startFlushDesc = flushDesc;
672            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
673            assertNull(result.result);
674          }
675        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
676          // do not replay any flush commit yet
677          // hold on to the flush commit marker but simulate a larger
678          // flush commit seqId
679          commitFlushDesc = FlushDescriptor.newBuilder(flushDesc)
680            .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build();
681        }
682        // after replay verify that everything is still visible
683        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
684      } else {
685        lastReplayed = replayEdit(secondaryRegion, entry);
686      }
687    }
688
689    // at this point, there should be some data (rows 0-100) in memstore snapshot
690    // and some more data in memstores (rows 100-200)
691    verifyData(secondaryRegion, 0, numRows, cq, families);
692
693    // no store files in the region
694    int expectedStoreFileCount = 0;
695    for (HStore s : secondaryRegion.getStores()) {
696      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
697    }
698    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
699
700    // Test case 1: replay the a flush commit marker larger than what we have prepared
701    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
702      + startFlushDesc);
703    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
704
705    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
706    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
707
708    // assert that the flush files are picked
709    expectedStoreFileCount++;
710    for (HStore s : secondaryRegion.getStores()) {
711      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
712    }
713    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
714    MemStoreSize mss = store.getFlushableSize();
715    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
716
717    // assert that the region memstore is smaller than before, but not empty
718    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
719    assertTrue(newRegionMemstoreSize > 0);
720    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
721
722    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
723
724    LOG.info("-- Verifying edits from secondary");
725    verifyData(secondaryRegion, 0, numRows, cq, families);
726
727    LOG.info("-- Verifying edits from primary.");
728    verifyData(primaryRegion, 0, numRows, cq, families);
729  }
730
731  /**
732   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
733   * memstore edits should be dropped after the flush commit replay since they should be in flushed
734   * files
735   */
736  @Test
737  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
738    throws IOException {
739    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
740  }
741
742  /**
743   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
744   * memstore edits should be not dropped after the flush commit replay since not every edit will be
745   * in flushed files (based on seqId)
746   */
747  @Test
748  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
749    throws IOException {
750    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
751  }
752
753  /**
754   * Tests the case where we receive a flush commit before receiving any flush prepare markers
755   */
756  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
757    throws IOException {
758    // load some data to primary and flush. 1 flushes and some more unflushed data.
759    // write more data after flush depending on whether droppableSnapshot
760    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
761    int numRows = droppableMemstore ? 100 : 200;
762
763    // now replay the edits and the flush marker
764    reader = createWALReaderForPrimary();
765
766    LOG.info("-- Replaying edits and flush events in secondary");
767    FlushDescriptor commitFlushDesc = null;
768
769    int lastReplayed = 0;
770    while (true) {
771      WAL.Entry entry = reader.next();
772      if (entry == null) {
773        break;
774      }
775      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
776      if (flushDesc != null) {
777        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
778          // do not replay flush start marker
779        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
780          commitFlushDesc = flushDesc; // hold on to the flush commit marker
781        }
782        // after replay verify that everything is still visible
783        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
784      } else {
785        lastReplayed = replayEdit(secondaryRegion, entry);
786      }
787    }
788
789    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
790    // and some more data in memstores (rows 100-300)
791    verifyData(secondaryRegion, 0, numRows, cq, families);
792
793    // no store files in the region
794    int expectedStoreFileCount = 0;
795    for (HStore s : secondaryRegion.getStores()) {
796      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
797    }
798    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
799
800    // Test case 1: replay a flush commit marker without start flush marker
801    assertNull(secondaryRegion.getPrepareFlushResult());
802    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
803
804    // ensure all files are visible in secondary
805    for (HStore store : secondaryRegion.getStores()) {
806      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
807    }
808
809    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
810    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
811
812    // assert that the flush files are picked
813    expectedStoreFileCount++;
814    for (HStore s : secondaryRegion.getStores()) {
815      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
816    }
817    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
818    MemStoreSize mss = store.getFlushableSize();
819    if (droppableMemstore) {
820      // assert that the memstore is dropped
821      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
822    } else {
823      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
824    }
825
826    // assert that the region memstore is same as before (we could not drop)
827    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
828    if (droppableMemstore) {
829      assertTrue(0 == newRegionMemstoreSize);
830    } else {
831      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
832    }
833
834    LOG.info("-- Verifying edits from secondary");
835    verifyData(secondaryRegion, 0, numRows, cq, families);
836
837    LOG.info("-- Verifying edits from primary.");
838    verifyData(primaryRegion, 0, numRows, cq, families);
839  }
840
841  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
842    return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build();
843  }
844
845  /**
846   * Tests replaying region open markers from primary region. Checks whether the files are picked up
847   */
848  @Test
849  public void testReplayRegionOpenEvent() throws IOException {
850    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
851    int numRows = 100;
852
853    // close the region and open again.
854    primaryRegion.close();
855    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
856
857    // now replay the edits and the flush marker
858    reader = createWALReaderForPrimary();
859    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
860
861    LOG.info("-- Replaying edits and region events in secondary");
862    while (true) {
863      WAL.Entry entry = reader.next();
864      if (entry == null) {
865        break;
866      }
867      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
868      RegionEventDescriptor regionEventDesc =
869        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
870
871      if (flushDesc != null) {
872        // don't replay flush events
873      } else if (regionEventDesc != null) {
874        regionEvents.add(regionEventDesc);
875      } else {
876        // don't replay edits
877      }
878    }
879
880    // we should have 1 open, 1 close and 1 open event
881    assertEquals(3, regionEvents.size());
882
883    // replay the first region open event.
884    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
885
886    // replay the close event as well
887    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
888
889    // no store files in the region
890    int expectedStoreFileCount = 0;
891    for (HStore s : secondaryRegion.getStores()) {
892      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
893    }
894    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
895    assertTrue(regionMemstoreSize == 0);
896
897    // now replay the region open event that should contain new file locations
898    LOG.info("Testing replaying region open event " + regionEvents.get(2));
899    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
900
901    // assert that the flush files are picked
902    expectedStoreFileCount++;
903    for (HStore s : secondaryRegion.getStores()) {
904      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
905    }
906    Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
907    MemStoreSize mss = store.getFlushableSize();
908    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
909
910    // assert that the region memstore is empty
911    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
912    assertTrue(newRegionMemstoreSize == 0);
913
914    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
915                                                         // any
916
917    LOG.info("-- Verifying edits from secondary");
918    verifyData(secondaryRegion, 0, numRows, cq, families);
919
920    LOG.info("-- Verifying edits from primary.");
921    verifyData(primaryRegion, 0, numRows, cq, families);
922  }
923
924  /**
925   * Tests the case where we replay a region open event after a flush start but before receiving
926   * flush commit
927   */
928  @Test
929  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
930    putDataWithFlushes(primaryRegion, 100, 100, 100);
931    int numRows = 200;
932
933    // close the region and open again.
934    primaryRegion.close();
935    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
936
937    // now replay the edits and the flush marker
938    reader = createWALReaderForPrimary();
939    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
940
941    LOG.info("-- Replaying edits and region events in secondary");
942    while (true) {
943      WAL.Entry entry = reader.next();
944      if (entry == null) {
945        break;
946      }
947      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
948      RegionEventDescriptor regionEventDesc =
949        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
950
951      if (flushDesc != null) {
952        // only replay flush start
953        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
954          secondaryRegion.replayWALFlushStartMarker(flushDesc);
955        }
956      } else if (regionEventDesc != null) {
957        regionEvents.add(regionEventDesc);
958      } else {
959        replayEdit(secondaryRegion, entry);
960      }
961    }
962
963    // at this point, there should be some data (rows 0-100) in the memstore snapshot
964    // and some more data in memstores (rows 100-200)
965    verifyData(secondaryRegion, 0, numRows, cq, families);
966
967    // we should have 1 open, 1 close and 1 open event
968    assertEquals(3, regionEvents.size());
969
970    // no store files in the region
971    int expectedStoreFileCount = 0;
972    for (HStore s : secondaryRegion.getStores()) {
973      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
974    }
975
976    // now replay the region open event that should contain new file locations
977    LOG.info("Testing replaying region open event " + regionEvents.get(2));
978    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
979
980    // assert that the flush files are picked
981    expectedStoreFileCount = 2; // two flushes happened
982    for (HStore s : secondaryRegion.getStores()) {
983      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
984    }
985    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
986    MemStoreSize newSnapshotSize = store.getSnapshotSize();
987    assertTrue(newSnapshotSize.getDataSize() == 0);
988
989    // assert that the region memstore is empty
990    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
991    assertTrue(newRegionMemstoreSize == 0);
992
993    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
994                                                         // any
995
996    LOG.info("-- Verifying edits from secondary");
997    verifyData(secondaryRegion, 0, numRows, cq, families);
998
999    LOG.info("-- Verifying edits from primary.");
1000    verifyData(primaryRegion, 0, numRows, cq, families);
1001  }
1002
1003  /**
1004   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1005   * of the last replayed region open event.
1006   */
1007  @Test
1008  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1009    putDataWithFlushes(primaryRegion, 100, 100, 0);
1010    int numRows = 100;
1011
1012    // close the region and open again.
1013    primaryRegion.close();
1014    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1015
1016    // now replay the edits and the flush marker
1017    reader = createWALReaderForPrimary();
1018    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1019    List<WAL.Entry> edits = Lists.newArrayList();
1020
1021    LOG.info("-- Replaying edits and region events in secondary");
1022    while (true) {
1023      WAL.Entry entry = reader.next();
1024      if (entry == null) {
1025        break;
1026      }
1027      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1028      RegionEventDescriptor regionEventDesc =
1029        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1030
1031      if (flushDesc != null) {
1032        // don't replay flushes
1033      } else if (regionEventDesc != null) {
1034        regionEvents.add(regionEventDesc);
1035      } else {
1036        edits.add(entry);
1037      }
1038    }
1039
1040    // replay the region open of first open, but with the seqid of the second open
1041    // this way non of the flush files will be picked up.
1042    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0))
1043      .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build());
1044
1045    // replay edits from the before region close. If replay does not
1046    // skip these the following verification will NOT fail.
1047    for (WAL.Entry entry : edits) {
1048      replayEdit(secondaryRegion, entry);
1049    }
1050
1051    boolean expectedFail = false;
1052    try {
1053      verifyData(secondaryRegion, 0, numRows, cq, families);
1054    } catch (AssertionError e) {
1055      expectedFail = true; // expected
1056    }
1057    if (!expectedFail) {
1058      fail("Should have failed this verification");
1059    }
1060  }
1061
1062  @Test
1063  public void testReplayFlushSeqIds() throws IOException {
1064    // load some data to primary and flush
1065    int start = 0;
1066    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
1067    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1068    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1069    primaryRegion.flush(true);
1070
1071    // now replay the flush marker
1072    reader = createWALReaderForPrimary();
1073
1074    long flushSeqId = -1;
1075    LOG.info("-- Replaying flush events in secondary");
1076    while (true) {
1077      WAL.Entry entry = reader.next();
1078      if (entry == null) {
1079        break;
1080      }
1081      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1082      if (flushDesc != null) {
1083        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1084          LOG.info("-- Replaying flush start in secondary");
1085          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1086          flushSeqId = flushDesc.getFlushSequenceNumber();
1087        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1088          LOG.info("-- Replaying flush commit in secondary");
1089          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1090          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1091        }
1092      }
1093      // else do not replay
1094    }
1095
1096    // TODO: what to do with this?
1097    // assert that the newly picked up flush file is visible
1098    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1099    assertEquals(flushSeqId, readPoint);
1100
1101    // after replay verify that everything is still visible
1102    verifyData(secondaryRegion, 0, 100, cq, families);
1103  }
1104
1105  @Test
1106  public void testSeqIdsFromReplay() throws IOException {
1107    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1108    // original seqIds and they are made visible through mvcc read point upon replay
1109    String method = name.getMethodName();
1110    byte[] tableName = Bytes.toBytes(method);
1111    byte[] family = Bytes.toBytes("family");
1112
1113    HRegion region = initHRegion(tableName, method, family);
1114    try {
1115      // replay an entry that is bigger than current read point
1116      long readPoint = region.getMVCC().getReadPoint();
1117      long origSeqId = readPoint + 100;
1118
1119      Put put = new Put(row).addColumn(family, row, row);
1120      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1121      replay(region, put, origSeqId);
1122
1123      // read point should have advanced to this seqId
1124      assertGet(region, family, row);
1125
1126      // region seqId should have advanced at least to this seqId
1127      assertEquals(origSeqId, region.getReadPoint(null));
1128
1129      // replay an entry that is smaller than current read point
1130      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1131      // replay does not allow reads while replay is going on.
1132      put = new Put(row2).addColumn(family, row2, row2);
1133      put.setDurability(Durability.SKIP_WAL);
1134      replay(region, put, origSeqId - 50);
1135
1136      assertGet(region, family, row2);
1137    } finally {
1138      region.close();
1139    }
1140  }
1141
1142  /**
1143   * Tests that a region opened in secondary mode would not write region open / close events to its
1144   * WAL.
1145   */
1146  @Test
1147  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1148    secondaryRegion.close();
1149    walSecondary = spy(walSecondary);
1150
1151    // test for region open and close
1152    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1153    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1154      any(WALEdit.class));
1155
1156    // test for replay prepare flush
1157    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1158    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder()
1159      .setFlushSequenceNumber(10)
1160      .setTableName(UnsafeByteOperations
1161        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1162      .setAction(FlushAction.START_FLUSH)
1163      .setEncodedRegionName(
1164        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1165      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1166      .build());
1167
1168    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1169      any(WALEdit.class));
1170
1171    secondaryRegion.close();
1172    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1173      any(WALEdit.class));
1174  }
1175
1176  /**
1177   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1178   */
1179  @Test
1180  public void testRegionReadsEnabledFlag() throws IOException {
1181
1182    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1183
1184    verifyData(secondaryRegion, 0, 100, cq, families);
1185
1186    // now disable reads
1187    secondaryRegion.setReadsEnabled(false);
1188    try {
1189      verifyData(secondaryRegion, 0, 100, cq, families);
1190      fail("Should have failed with IOException");
1191    } catch (IOException ex) {
1192      // expected
1193    }
1194
1195    // verify that we can still replay data
1196    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1197
1198    // now enable reads again
1199    secondaryRegion.setReadsEnabled(true);
1200    verifyData(secondaryRegion, 0, 200, cq, families);
1201  }
1202
1203  /**
1204   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1205   * It should write the flush request marker instead.
1206   */
1207  @Test
1208  public void testWriteFlushRequestMarker() throws IOException {
1209    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1210    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1211    assertNotNull(result);
1212    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1213    assertFalse(result.wroteFlushWalMarker);
1214
1215    // request flush again, but this time with writeFlushRequestWalMarker = true
1216    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1217    assertNotNull(result);
1218    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1219    assertTrue(result.wroteFlushWalMarker);
1220
1221    List<FlushDescriptor> flushes = Lists.newArrayList();
1222    reader = createWALReaderForPrimary();
1223    while (true) {
1224      WAL.Entry entry = reader.next();
1225      if (entry == null) {
1226        break;
1227      }
1228      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1229      if (flush != null) {
1230        flushes.add(flush);
1231      }
1232    }
1233
1234    assertEquals(1, flushes.size());
1235    assertNotNull(flushes.get(0));
1236    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1237  }
1238
1239  /**
1240   * Test the case where the secondary region replica is not in reads enabled state because it is
1241   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush
1242   * marker entry should restore the reads enabled status in the region and allow the reads to
1243   * continue.
1244   */
1245  @Test
1246  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1247    disableReads(secondaryRegion);
1248
1249    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1250    // triggered flush restores readsEnabled
1251    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1252    reader = createWALReaderForPrimary();
1253    while (true) {
1254      WAL.Entry entry = reader.next();
1255      if (entry == null) {
1256        break;
1257      }
1258      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1259      if (flush != null) {
1260        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1261      }
1262    }
1263
1264    // now reads should be enabled
1265    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1266  }
1267
1268  /**
1269   * Test the case where the secondary region replica is not in reads enabled state because it is
1270   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1271   * entries should restore the reads enabled status in the region and allow the reads to continue.
1272   */
1273  @Test
1274  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1275    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1276    // from triggered flush restores readsEnabled
1277    disableReads(secondaryRegion);
1278
1279    // put some data in primary
1280    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1281    primaryRegion.flush(true);
1282    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1283    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1284    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1285    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1286    // but can't figure it... and this is only test that seems to suffer this flush issue.
1287    // St.Ack 20160201
1288    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1289
1290    reader = createWALReaderForPrimary();
1291    while (true) {
1292      WAL.Entry entry = reader.next();
1293      LOG.info(Objects.toString(entry));
1294      if (entry == null) {
1295        break;
1296      }
1297      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1298      if (flush != null) {
1299        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1300      } else {
1301        replayEdit(secondaryRegion, entry);
1302      }
1303    }
1304
1305    // now reads should be enabled
1306    verifyData(secondaryRegion, 0, 100, cq, families);
1307  }
1308
1309  /**
1310   * Test the case where the secondary region replica is not in reads enabled state because it is
1311   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1312   * entries should restore the reads enabled status in the region and allow the reads to continue.
1313   */
1314  @Test
1315  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1316    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1317    // from triggered flush restores readsEnabled
1318    disableReads(secondaryRegion);
1319
1320    // put some data in primary
1321    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1322    primaryRegion.flush(true);
1323
1324    reader = createWALReaderForPrimary();
1325    while (true) {
1326      WAL.Entry entry = reader.next();
1327      if (entry == null) {
1328        break;
1329      }
1330      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1331      if (flush != null) {
1332        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1333      }
1334    }
1335
1336    // now reads should be enabled
1337    verifyData(secondaryRegion, 0, 100, cq, families);
1338  }
1339
1340  /**
1341   * Test the case where the secondary region replica is not in reads enabled state because it is
1342   * waiting for a flush or region open marker from primary region. Replaying region open event
1343   * entry from primary should restore the reads enabled status in the region and allow the reads to
1344   * continue.
1345   */
1346  @Test
1347  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1348    // Test case 3: Test that replaying region open event markers restores readsEnabled
1349    disableReads(secondaryRegion);
1350
1351    primaryRegion.close();
1352    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1353
1354    reader = createWALReaderForPrimary();
1355    while (true) {
1356      WAL.Entry entry = reader.next();
1357      if (entry == null) {
1358        break;
1359      }
1360
1361      RegionEventDescriptor regionEventDesc =
1362        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1363
1364      if (regionEventDesc != null) {
1365        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1366      }
1367    }
1368
1369    // now reads should be enabled
1370    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1371  }
1372
1373  @Test
1374  public void testRefresStoreFiles() throws IOException {
1375    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1376    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1377
1378    // Test case 1: refresh with an empty region
1379    secondaryRegion.refreshStoreFiles();
1380    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1381
1382    // do one flush
1383    putDataWithFlushes(primaryRegion, 100, 100, 0);
1384    int numRows = 100;
1385
1386    // refresh the store file list, and ensure that the files are picked up.
1387    secondaryRegion.refreshStoreFiles();
1388    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1389      secondaryRegion.getStoreFileList(families));
1390    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1391
1392    LOG.info("-- Verifying edits from secondary");
1393    verifyData(secondaryRegion, 0, numRows, cq, families);
1394
1395    // Test case 2: 3 some more flushes
1396    putDataWithFlushes(primaryRegion, 100, 300, 0);
1397    numRows = 300;
1398
1399    // refresh the store file list, and ensure that the files are picked up.
1400    secondaryRegion.refreshStoreFiles();
1401    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1402      secondaryRegion.getStoreFileList(families));
1403    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1404
1405    LOG.info("-- Verifying edits from secondary");
1406    verifyData(secondaryRegion, 0, numRows, cq, families);
1407
1408    if (FSUtils.WINDOWS) {
1409      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1410      return;
1411    }
1412
1413    // Test case 3: compact primary files
1414    primaryRegion.compactStores();
1415    List<HRegion> regions = new ArrayList<>();
1416    regions.add(primaryRegion);
1417    Mockito.doReturn(regions).when(rss).getRegions();
1418    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1419    cleaner.chore();
1420    secondaryRegion.refreshStoreFiles();
1421    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1422      secondaryRegion.getStoreFileList(families));
1423    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1424
1425    LOG.info("-- Verifying edits from secondary");
1426    verifyData(secondaryRegion, 0, numRows, cq, families);
1427
1428    LOG.info("-- Replaying edits in secondary");
1429
1430    // Test case 4: replay some edits, ensure that memstore is dropped.
1431    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1432    putDataWithFlushes(primaryRegion, 400, 400, 0);
1433    numRows = 400;
1434
1435    reader = createWALReaderForPrimary();
1436    while (true) {
1437      WAL.Entry entry = reader.next();
1438      if (entry == null) {
1439        break;
1440      }
1441      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1442      if (flush != null) {
1443        // do not replay flush
1444      } else {
1445        replayEdit(secondaryRegion, entry);
1446      }
1447    }
1448
1449    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1450
1451    secondaryRegion.refreshStoreFiles();
1452
1453    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1454
1455    LOG.info("-- Verifying edits from primary");
1456    verifyData(primaryRegion, 0, numRows, cq, families);
1457    LOG.info("-- Verifying edits from secondary");
1458    verifyData(secondaryRegion, 0, numRows, cq, families);
1459  }
1460
1461  /**
1462   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1463   */
1464  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1465    List<Path> l1 = new ArrayList<>(list1.size());
1466    for (String path : list1) {
1467      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1468    }
1469    List<Path> l2 = new ArrayList<>(list2.size());
1470    for (String path : list2) {
1471      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1472    }
1473    assertEquals(l1, l2);
1474  }
1475
1476  private void disableReads(HRegion region) {
1477    region.setReadsEnabled(false);
1478    try {
1479      verifyData(region, 0, 1, cq, families);
1480      fail("Should have failed with IOException");
1481    } catch (IOException ex) {
1482      // expected
1483    }
1484  }
1485
1486  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1487    put.setDurability(Durability.SKIP_WAL);
1488    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1489    region.batchReplay(new MutationReplay[] { mutation }, replaySeqId);
1490  }
1491
1492  /**
1493   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1494   */
1495  @Test
1496  public void testReplayBulkLoadEvent() throws IOException {
1497    LOG.info("testReplayBulkLoadEvent starts");
1498    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1499
1500    // close the region and open again.
1501    primaryRegion.close();
1502    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1503
1504    // bulk load a file into primary region
1505    byte[] randomValues = new byte[20];
1506    Bytes.random(randomValues);
1507    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1508
1509    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1510    int expectedLoadFileCount = 0;
1511    for (byte[] family : families) {
1512      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1513      expectedLoadFileCount++;
1514    }
1515    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1516
1517    // now replay the edits and the bulk load marker
1518    reader = createWALReaderForPrimary();
1519
1520    LOG.info("-- Replaying edits and region events in secondary");
1521    BulkLoadDescriptor bulkloadEvent = null;
1522    while (true) {
1523      WAL.Entry entry = reader.next();
1524      if (entry == null) {
1525        break;
1526      }
1527      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1528      if (bulkloadEvent != null) {
1529        break;
1530      }
1531    }
1532
1533    // we should have 1 bulk load event
1534    assertTrue(bulkloadEvent != null);
1535    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1536
1537    // replay the bulk load event
1538    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1539
1540    List<String> storeFileName = new ArrayList<>();
1541    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1542      storeFileName.addAll(storeDesc.getStoreFileList());
1543    }
1544    // assert that the bulk loaded files are picked
1545    for (HStore s : secondaryRegion.getStores()) {
1546      for (HStoreFile sf : s.getStorefiles()) {
1547        storeFileName.remove(sf.getPath().getName());
1548      }
1549    }
1550    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1551
1552    LOG.info("-- Verifying edits from secondary");
1553    for (byte[] family : families) {
1554      assertGet(secondaryRegion, family, randomValues);
1555    }
1556  }
1557
1558  @Test
1559  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1560    // tests replaying flush commit marker, but the flush file has already been compacted
1561    // from primary and also deleted from the archive directory
1562    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder()
1563      .setFlushSequenceNumber(Long.MAX_VALUE)
1564      .setTableName(UnsafeByteOperations
1565        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1566      .setAction(FlushAction.COMMIT_FLUSH)
1567      .setEncodedRegionName(
1568        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1569      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1570      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1571        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1572        .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build())
1573      .build());
1574  }
1575
1576  @Test
1577  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1578    // tests replaying compaction marker, but the compaction output file has already been compacted
1579    // from primary and also deleted from the archive directory
1580    secondaryRegion
1581      .replayWALCompactionMarker(
1582        CompactionDescriptor.newBuilder()
1583          .setTableName(UnsafeByteOperations
1584            .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1585          .setEncodedRegionName(
1586            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1587          .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123")
1588          .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir")
1589          .setRegionName(
1590            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1591          .build(),
1592        true, true, Long.MAX_VALUE);
1593  }
1594
1595  @Test
1596  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1597    // tests replaying region open event marker, but the region files have already been compacted
1598    // from primary and also deleted from the archive directory
1599    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1600      .setTableName(UnsafeByteOperations
1601        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1602      .setEncodedRegionName(
1603        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1604      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1605      .setEventType(EventType.REGION_OPEN)
1606      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1607      .setLogSequenceNumber(Long.MAX_VALUE)
1608      .addStores(
1609        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1610          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1611      .build());
1612  }
1613
1614  @Test
1615  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1616    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1617    // from primary and also deleted from the archive directory
1618    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1619      .setTableName(
1620        ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1621      .setEncodedRegionName(
1622        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1623      .setBulkloadSeqNum(Long.MAX_VALUE)
1624      .addStores(
1625        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1626          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1627      .build());
1628  }
1629
1630  private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes)
1631    throws IOException {
1632    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1633    // TODO We need a way to do this without creating files
1634    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1635    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1636    try {
1637      hFileFactory.withOutputStream(out);
1638      hFileFactory.withFileContext(new HFileContextBuilder().build());
1639      HFile.Writer writer = hFileFactory.create();
1640      try {
1641        writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L,
1642          KeyValue.Type.Put.getCode(), valueBytes)));
1643      } finally {
1644        writer.close();
1645      }
1646    } finally {
1647      out.close();
1648    }
1649    return testFile.toString();
1650  }
1651
1652  /**
1653   * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush
1654   * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but
1655   * does not execute flush after
1656   */
1657  private void putDataWithFlushes(HRegion region, int flushInterval, int numRows,
1658    int numRowsAfterFlush) throws IOException {
1659    int start = 0;
1660    for (; start < numRows; start += flushInterval) {
1661      LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval));
1662      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1663      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1664      region.flush(true);
1665    }
1666    LOG.info("-- Writing some more data to primary, not flushing");
1667    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1668  }
1669
1670  private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf,
1671    byte[]... families) throws IOException {
1672    for (int i = startRow; i < startRow + numRows; i++) {
1673      Put put = new Put(Bytes.toBytes("" + i));
1674      put.setDurability(Durability.SKIP_WAL);
1675      for (byte[] family : families) {
1676        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1677      }
1678      replay(region, put, i + 1);
1679    }
1680  }
1681
1682  private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families)
1683    throws IOException {
1684    return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1685      callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1686  }
1687
1688  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1689    String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1690    byte[]... families) throws IOException {
1691    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1692      isReadOnly, durability, wal, families);
1693  }
1694}