001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellBuilderType;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseTestingUtil;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Durability;
057import org.apache.hadoop.hbase.client.Get;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.executor.ExecutorService;
064import org.apache.hadoop.hbase.executor.ExecutorType;
065import org.apache.hadoop.hbase.io.hfile.HFile;
066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
070import org.apache.hadoop.hbase.testclassification.LargeTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
074import org.apache.hadoop.hbase.util.FSUtils;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
077import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
078import org.apache.hadoop.hbase.wal.WAL;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALFactory;
081import org.apache.hadoop.hbase.wal.WALKeyImpl;
082import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
083import org.apache.hadoop.hbase.wal.WALStreamReader;
084import org.apache.hadoop.util.StringUtils;
085import org.junit.After;
086import org.junit.AfterClass;
087import org.junit.Before;
088import org.junit.BeforeClass;
089import org.junit.ClassRule;
090import org.junit.Rule;
091import org.junit.Test;
092import org.junit.experimental.categories.Category;
093import org.junit.rules.TestName;
094import org.mockito.Mockito;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
099import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
100
101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
111
112/**
113 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
114 * region replicas
115 */
116@SuppressWarnings("deprecation")
117@Category(LargeTests.class)
118public class TestHRegionReplayEvents {
119
120  @ClassRule
121  public static final HBaseClassTestRule CLASS_RULE =
122    HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
123
124  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
125  @Rule
126  public TestName name = new TestName();
127
128  private static HBaseTestingUtil TEST_UTIL;
129
130  public static Configuration CONF;
131  private String dir;
132
133  private byte[][] families =
134    new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") };
135
136  // Test names
137  protected byte[] tableName;
138  protected String method;
139  protected final byte[] row = Bytes.toBytes("rowA");
140  protected final byte[] row2 = Bytes.toBytes("rowB");
141  protected byte[] cq = Bytes.toBytes("cq");
142
143  // per test fields
144  private Path rootDir;
145  private TableDescriptor htd;
146  private RegionServerServices rss;
147  private RegionInfo primaryHri, secondaryHri;
148  private HRegion primaryRegion, secondaryRegion;
149  private WAL walPrimary, walSecondary;
150  private WALStreamReader reader;
151
152  @BeforeClass
153  public static void setUpBeforeClass() throws Exception {
154    TEST_UTIL = new HBaseTestingUtil();
155    TEST_UTIL.startMiniDFSCluster(1);
156  }
157
158  @AfterClass
159  public static void tearDownAfterClass() throws Exception {
160    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
161    TEST_UTIL.cleanupTestDir();
162    TEST_UTIL.shutdownMiniDFSCluster();
163  }
164
165  @Before
166  public void setUp() throws Exception {
167    CONF = TEST_UTIL.getConfiguration();
168    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
169    method = name.getMethodName();
170    tableName = Bytes.toBytes(name.getMethodName());
171    rootDir = new Path(dir + method);
172    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
173    method = name.getMethodName();
174    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
175    for (byte[] family : families) {
176      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
177    }
178    htd = builder.build();
179
180    long time = EnvironmentEdgeManager.currentTime();
181    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
182      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
183    primaryHri =
184      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
185    secondaryHri =
186      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
187
188    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
189    walPrimary = wals.getWAL(primaryHri);
190    walSecondary = wals.getWAL(secondaryHri);
191
192    rss = mock(RegionServerServices.class);
193    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
194    when(rss.getConfiguration()).thenReturn(CONF);
195    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
196    String string =
197      org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString();
198    ExecutorService es = new ExecutorService(string);
199    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1)
200      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
201    when(rss.getExecutorService()).thenReturn(es);
202    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
203    primaryRegion.close();
204    List<HRegion> regions = new ArrayList<>();
205    regions.add(primaryRegion);
206    Mockito.doReturn(regions).when(rss).getRegions();
207
208    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
209    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
210
211    reader = null;
212  }
213
214  @After
215  public void tearDown() throws Exception {
216    if (reader != null) {
217      reader.close();
218    }
219
220    if (primaryRegion != null) {
221      HBaseTestingUtil.closeRegionAndWAL(primaryRegion);
222    }
223    if (secondaryRegion != null) {
224      HBaseTestingUtil.closeRegionAndWAL(secondaryRegion);
225    }
226
227    EnvironmentEdgeManagerTestHelper.reset();
228  }
229
230  String getName() {
231    return name.getMethodName();
232  }
233
234  // Some of the test cases are as follows:
235  // 1. replay flush start marker again
236  // 2. replay flush with smaller seqId than what is there in memstore snapshot
237  // 3. replay flush with larger seqId than what is there in memstore snapshot
238  // 4. replay flush commit without flush prepare. non droppable memstore
239  // 5. replay flush commit without flush prepare. droppable memstore
240  // 6. replay open region event
241  // 7. replay open region event after flush start
242  // 8. replay flush form an earlier seqId (test ignoring seqIds)
243  // 9. start flush does not prevent region from closing.
244
245  @Test
246  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
247    // load some data and flush ensure that the secondary replica will not execute the flush
248
249    // load some data to secondary by replaying
250    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
251
252    verifyData(secondaryRegion, 0, 1000, cq, families);
253
254    // flush region
255    FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true);
256    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
257
258    verifyData(secondaryRegion, 0, 1000, cq, families);
259
260    // close the region, and inspect that it has not flushed
261    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
262    // assert that there are no files (due to flush)
263    for (List<HStoreFile> f : files.values()) {
264      assertTrue(f.isEmpty());
265    }
266  }
267
268  /**
269   * Tests a case where we replay only a flush start marker, then the region is closed. This region
270   * should not block indefinitely
271   */
272  @Test
273  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
274    // load some data to primary and flush
275    int start = 0;
276    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
277    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
278    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
279    primaryRegion.flush(true);
280
281    // now replay the edits and the flush marker
282    reader = createWALReaderForPrimary();
283
284    LOG.info("-- Replaying edits and flush events in secondary");
285    while (true) {
286      WAL.Entry entry = reader.next();
287      if (entry == null) {
288        break;
289      }
290      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
291      if (flushDesc != null) {
292        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
293          LOG.info("-- Replaying flush start in secondary");
294          secondaryRegion.replayWALFlushStartMarker(flushDesc);
295        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
296          LOG.info("-- NOT Replaying flush commit in secondary");
297        }
298      } else {
299        replayEdit(secondaryRegion, entry);
300      }
301    }
302
303    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
304    // now close the region which should not cause hold because of un-committed flush
305    secondaryRegion.close();
306
307    // verify that the memstore size is back to what it was
308    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
309  }
310
311  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
312    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
313      return 0; // handled elsewhere
314    }
315    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
316    for (Cell cell : entry.getEdit().getCells())
317      put.add(cell);
318    put.setDurability(Durability.SKIP_WAL);
319    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
320    region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId());
321    return Integer.parseInt(Bytes.toString(put.getRow()));
322  }
323
324  private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
325    return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
326      AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
327  }
328
329  @Test
330  public void testBatchReplayWithMultipleNonces() throws IOException {
331    try {
332      MutationReplay[] mutations = new MutationReplay[100];
333      for (int i = 0; i < 100; i++) {
334        Put put = new Put(Bytes.toBytes(i));
335        put.setDurability(Durability.SYNC_WAL);
336        for (byte[] familly : this.families) {
337          put.addColumn(familly, this.cq, null);
338          long nonceNum = i / 10;
339          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
340        }
341      }
342      primaryRegion.batchReplay(mutations, 20);
343    } catch (Exception e) {
344      String msg = "Error while replay of batch with multiple nonces. ";
345      LOG.error(msg, e);
346      fail(msg + e.getMessage());
347    }
348  }
349
350  @Test
351  public void testReplayFlushesAndCompactions() throws IOException {
352    // initiate a secondary region with some data.
353
354    // load some data to primary and flush. 3 flushes and some more unflushed data
355    putDataWithFlushes(primaryRegion, 100, 300, 100);
356
357    // compaction from primary
358    LOG.info("-- Compacting primary, only 1 store");
359    primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE);
360
361    // now replay the edits and the flush marker
362    reader = createWALReaderForPrimary();
363
364    LOG.info("-- Replaying edits and flush events in secondary");
365    int lastReplayed = 0;
366    int expectedStoreFileCount = 0;
367    while (true) {
368      WAL.Entry entry = reader.next();
369      if (entry == null) {
370        break;
371      }
372      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
373      CompactionDescriptor compactionDesc =
374        WALEdit.getCompaction(entry.getEdit().getCells().get(0));
375      if (flushDesc != null) {
376        // first verify that everything is replayed and visible before flush event replay
377        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
378        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
379        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
380        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
381        MemStoreSize mss = store.getFlushableSize();
382        long storeSize = store.getSize();
383        long storeSizeUncompressed = store.getStoreSizeUncompressed();
384        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
385          LOG.info("-- Replaying flush start in secondary");
386          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
387          assertNull(result.result);
388          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
389
390          // assert that the store memstore is smaller now
391          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
392          LOG.info("Memstore size reduced by:"
393            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
394          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
395
396        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
397          LOG.info("-- Replaying flush commit in secondary");
398          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
399
400          // assert that the flush files are picked
401          expectedStoreFileCount++;
402          for (HStore s : secondaryRegion.getStores()) {
403            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
404          }
405          MemStoreSize newMss = store.getFlushableSize();
406          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
407
408          // assert that the region memstore is smaller now
409          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
410          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
411
412          // assert that the store sizes are bigger
413          assertTrue(store.getSize() > storeSize);
414          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
415          assertEquals(store.getSize(), store.getStorefilesSize());
416        }
417        // after replay verify that everything is still visible
418        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
419      } else if (compactionDesc != null) {
420        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
421
422        // assert that the compaction is applied
423        for (HStore store : secondaryRegion.getStores()) {
424          if (store.getColumnFamilyName().equals("cf1")) {
425            assertEquals(1, store.getStorefilesCount());
426          } else {
427            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
428          }
429        }
430      } else {
431        lastReplayed = replayEdit(secondaryRegion, entry);
432      }
433    }
434
435    assertEquals(400 - 1, lastReplayed);
436    LOG.info("-- Verifying edits from secondary");
437    verifyData(secondaryRegion, 0, 400, cq, families);
438
439    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
440    verifyData(primaryRegion, 0, lastReplayed, cq, families);
441    for (HStore store : primaryRegion.getStores()) {
442      if (store.getColumnFamilyName().equals("cf1")) {
443        assertEquals(1, store.getStorefilesCount());
444      } else {
445        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
446      }
447    }
448  }
449
450  /**
451   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
452   * equal to, greater or less than the previous flush start marker.
453   */
454  @Test
455  public void testReplayFlushStartMarkers() throws IOException {
456    // load some data to primary and flush. 1 flush and some more unflushed data
457    putDataWithFlushes(primaryRegion, 100, 100, 100);
458    int numRows = 200;
459
460    // now replay the edits and the flush marker
461    reader = createWALReaderForPrimary();
462
463    LOG.info("-- Replaying edits and flush events in secondary");
464
465    FlushDescriptor startFlushDesc = null;
466
467    int lastReplayed = 0;
468    while (true) {
469      WAL.Entry entry = reader.next();
470      if (entry == null) {
471        break;
472      }
473      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
474      if (flushDesc != null) {
475        // first verify that everything is replayed and visible before flush event replay
476        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
477        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
478        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
479        MemStoreSize mss = store.getFlushableSize();
480
481        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
482          startFlushDesc = flushDesc;
483          LOG.info("-- Replaying flush start in secondary");
484          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
485          assertNull(result.result);
486          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
487          assertTrue(regionMemstoreSize > 0);
488          assertTrue(mss.getHeapSize() > 0);
489
490          // assert that the store memstore is smaller now
491          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
492          LOG.info("Memstore size reduced by:"
493            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
494          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
495          verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
496
497        }
498        // after replay verify that everything is still visible
499        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
500      } else {
501        lastReplayed = replayEdit(secondaryRegion, entry);
502      }
503    }
504
505    // at this point, there should be some data (rows 0-100) in memstore snapshot
506    // and some more data in memstores (rows 100-200)
507
508    verifyData(secondaryRegion, 0, numRows, cq, families);
509
510    // Test case 1: replay the same flush start marker again
511    LOG.info("-- Replaying same flush start in secondary again");
512    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
513    assertNull(result); // this should return null. Ignoring the flush start marker
514    // assert that we still have prepared flush with the previous setup.
515    assertNotNull(secondaryRegion.getPrepareFlushResult());
516    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
517      startFlushDesc.getFlushSequenceNumber());
518    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
519    verifyData(secondaryRegion, 0, numRows, cq, families);
520
521    // Test case 2: replay a flush start marker with a smaller seqId
522    FlushDescriptor startFlushDescSmallerSeqId =
523      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
524    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
525    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
526    assertNull(result); // this should return null. Ignoring the flush start marker
527    // assert that we still have prepared flush with the previous setup.
528    assertNotNull(secondaryRegion.getPrepareFlushResult());
529    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
530      startFlushDesc.getFlushSequenceNumber());
531    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
532    verifyData(secondaryRegion, 0, numRows, cq, families);
533
534    // Test case 3: replay a flush start marker with a larger seqId
535    FlushDescriptor startFlushDescLargerSeqId =
536      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
537    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
538    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
539    assertNull(result); // this should return null. Ignoring the flush start marker
540    // assert that we still have prepared flush with the previous setup.
541    assertNotNull(secondaryRegion.getPrepareFlushResult());
542    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
543      startFlushDesc.getFlushSequenceNumber());
544    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
545    verifyData(secondaryRegion, 0, numRows, cq, families);
546
547    LOG.info("-- Verifying edits from secondary");
548    verifyData(secondaryRegion, 0, numRows, cq, families);
549
550    LOG.info("-- Verifying edits from primary.");
551    verifyData(primaryRegion, 0, numRows, cq, families);
552  }
553
554  /**
555   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
556   * less than the previous flush start marker.
557   */
558  @Test
559  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
560    // load some data to primary and flush. 2 flushes and some more unflushed data
561    putDataWithFlushes(primaryRegion, 100, 200, 100);
562    int numRows = 300;
563
564    // now replay the edits and the flush marker
565    reader = createWALReaderForPrimary();
566
567    LOG.info("-- Replaying edits and flush events in secondary");
568    FlushDescriptor startFlushDesc = null;
569    FlushDescriptor commitFlushDesc = null;
570
571    int lastReplayed = 0;
572    while (true) {
573      System.out.println(lastReplayed);
574      WAL.Entry entry = reader.next();
575      if (entry == null) {
576        break;
577      }
578      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
579      if (flushDesc != null) {
580        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
581          // don't replay the first flush start marker, hold on to it, replay the second one
582          if (startFlushDesc == null) {
583            startFlushDesc = flushDesc;
584          } else {
585            LOG.info("-- Replaying flush start in secondary");
586            startFlushDesc = flushDesc;
587            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
588            assertNull(result.result);
589          }
590        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
591          // do not replay any flush commit yet
592          if (commitFlushDesc == null) {
593            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
594          }
595        }
596        // after replay verify that everything is still visible
597        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
598      } else {
599        lastReplayed = replayEdit(secondaryRegion, entry);
600      }
601    }
602
603    // at this point, there should be some data (rows 0-200) in memstore snapshot
604    // and some more data in memstores (rows 200-300)
605    verifyData(secondaryRegion, 0, numRows, cq, families);
606
607    // no store files in the region
608    int expectedStoreFileCount = 0;
609    for (HStore s : secondaryRegion.getStores()) {
610      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
611    }
612    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
613
614    // Test case 1: replay the a flush commit marker smaller than what we have prepared
615    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
616      + startFlushDesc);
617    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
618
619    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
620    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
621
622    // assert that the flush files are picked
623    expectedStoreFileCount++;
624    for (HStore s : secondaryRegion.getStores()) {
625      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
626    }
627    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
628    MemStoreSize mss = store.getFlushableSize();
629    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
630
631    // assert that the region memstore is same as before
632    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
633    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
634
635    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
636
637    LOG.info("-- Verifying edits from secondary");
638    verifyData(secondaryRegion, 0, numRows, cq, families);
639
640    LOG.info("-- Verifying edits from primary.");
641    verifyData(primaryRegion, 0, numRows, cq, families);
642  }
643
644  /**
645   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
646   * larger than the previous flush start marker.
647   */
648  @Test
649  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
650    // load some data to primary and flush. 1 flush and some more unflushed data
651    putDataWithFlushes(primaryRegion, 100, 100, 100);
652    int numRows = 200;
653
654    // now replay the edits and the flush marker
655    reader = createWALReaderForPrimary();
656
657    LOG.info("-- Replaying edits and flush events in secondary");
658    FlushDescriptor startFlushDesc = null;
659    FlushDescriptor commitFlushDesc = null;
660
661    int lastReplayed = 0;
662    while (true) {
663      WAL.Entry entry = reader.next();
664      if (entry == null) {
665        break;
666      }
667      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
668      if (flushDesc != null) {
669        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
670          if (startFlushDesc == null) {
671            LOG.info("-- Replaying flush start in secondary");
672            startFlushDesc = flushDesc;
673            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
674            assertNull(result.result);
675          }
676        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
677          // do not replay any flush commit yet
678          // hold on to the flush commit marker but simulate a larger
679          // flush commit seqId
680          commitFlushDesc = FlushDescriptor.newBuilder(flushDesc)
681            .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build();
682        }
683        // after replay verify that everything is still visible
684        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
685      } else {
686        lastReplayed = replayEdit(secondaryRegion, entry);
687      }
688    }
689
690    // at this point, there should be some data (rows 0-100) in memstore snapshot
691    // and some more data in memstores (rows 100-200)
692    verifyData(secondaryRegion, 0, numRows, cq, families);
693
694    // no store files in the region
695    int expectedStoreFileCount = 0;
696    for (HStore s : secondaryRegion.getStores()) {
697      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
698    }
699    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
700
701    // Test case 1: replay the a flush commit marker larger than what we have prepared
702    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
703      + startFlushDesc);
704    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
705
706    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
707    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
708
709    // assert that the flush files are picked
710    expectedStoreFileCount++;
711    for (HStore s : secondaryRegion.getStores()) {
712      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
713    }
714    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
715    MemStoreSize mss = store.getFlushableSize();
716    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
717
718    // assert that the region memstore is smaller than before, but not empty
719    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
720    assertTrue(newRegionMemstoreSize > 0);
721    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
722
723    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
724
725    LOG.info("-- Verifying edits from secondary");
726    verifyData(secondaryRegion, 0, numRows, cq, families);
727
728    LOG.info("-- Verifying edits from primary.");
729    verifyData(primaryRegion, 0, numRows, cq, families);
730  }
731
732  /**
733   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
734   * memstore edits should be dropped after the flush commit replay since they should be in flushed
735   * files
736   */
737  @Test
738  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
739    throws IOException {
740    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
741  }
742
743  /**
744   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
745   * memstore edits should be not dropped after the flush commit replay since not every edit will be
746   * in flushed files (based on seqId)
747   */
748  @Test
749  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
750    throws IOException {
751    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
752  }
753
754  /**
755   * Tests the case where we receive a flush commit before receiving any flush prepare markers
756   */
757  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
758    throws IOException {
759    // load some data to primary and flush. 1 flushes and some more unflushed data.
760    // write more data after flush depending on whether droppableSnapshot
761    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
762    int numRows = droppableMemstore ? 100 : 200;
763
764    // now replay the edits and the flush marker
765    reader = createWALReaderForPrimary();
766
767    LOG.info("-- Replaying edits and flush events in secondary");
768    FlushDescriptor commitFlushDesc = null;
769
770    int lastReplayed = 0;
771    while (true) {
772      WAL.Entry entry = reader.next();
773      if (entry == null) {
774        break;
775      }
776      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
777      if (flushDesc != null) {
778        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
779          // do not replay flush start marker
780        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
781          commitFlushDesc = flushDesc; // hold on to the flush commit marker
782        }
783        // after replay verify that everything is still visible
784        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
785      } else {
786        lastReplayed = replayEdit(secondaryRegion, entry);
787      }
788    }
789
790    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
791    // and some more data in memstores (rows 100-300)
792    verifyData(secondaryRegion, 0, numRows, cq, families);
793
794    // no store files in the region
795    int expectedStoreFileCount = 0;
796    for (HStore s : secondaryRegion.getStores()) {
797      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
798    }
799    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
800
801    // Test case 1: replay a flush commit marker without start flush marker
802    assertNull(secondaryRegion.getPrepareFlushResult());
803    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
804
805    // ensure all files are visible in secondary
806    for (HStore store : secondaryRegion.getStores()) {
807      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
808    }
809
810    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
811    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
812
813    // assert that the flush files are picked
814    expectedStoreFileCount++;
815    for (HStore s : secondaryRegion.getStores()) {
816      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
817    }
818    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
819    MemStoreSize mss = store.getFlushableSize();
820    if (droppableMemstore) {
821      // assert that the memstore is dropped
822      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
823    } else {
824      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
825    }
826
827    // assert that the region memstore is same as before (we could not drop)
828    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
829    if (droppableMemstore) {
830      assertTrue(0 == newRegionMemstoreSize);
831    } else {
832      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
833    }
834
835    LOG.info("-- Verifying edits from secondary");
836    verifyData(secondaryRegion, 0, numRows, cq, families);
837
838    LOG.info("-- Verifying edits from primary.");
839    verifyData(primaryRegion, 0, numRows, cq, families);
840  }
841
842  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
843    return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build();
844  }
845
846  /**
847   * Tests replaying region open markers from primary region. Checks whether the files are picked up
848   */
849  @Test
850  public void testReplayRegionOpenEvent() throws IOException {
851    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
852    int numRows = 100;
853
854    // close the region and open again.
855    primaryRegion.close();
856    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
857
858    // now replay the edits and the flush marker
859    reader = createWALReaderForPrimary();
860    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
861
862    LOG.info("-- Replaying edits and region events in secondary");
863    while (true) {
864      WAL.Entry entry = reader.next();
865      if (entry == null) {
866        break;
867      }
868      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
869      RegionEventDescriptor regionEventDesc =
870        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
871
872      if (flushDesc != null) {
873        // don't replay flush events
874      } else if (regionEventDesc != null) {
875        regionEvents.add(regionEventDesc);
876      } else {
877        // don't replay edits
878      }
879    }
880
881    // we should have 1 open, 1 close and 1 open event
882    assertEquals(3, regionEvents.size());
883
884    // replay the first region open event.
885    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
886
887    // replay the close event as well
888    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
889
890    // no store files in the region
891    int expectedStoreFileCount = 0;
892    for (HStore s : secondaryRegion.getStores()) {
893      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
894    }
895    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
896    assertTrue(regionMemstoreSize == 0);
897
898    // now replay the region open event that should contain new file locations
899    LOG.info("Testing replaying region open event " + regionEvents.get(2));
900    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
901
902    // assert that the flush files are picked
903    expectedStoreFileCount++;
904    for (HStore s : secondaryRegion.getStores()) {
905      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
906    }
907    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
908    MemStoreSize mss = store.getFlushableSize();
909    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
910
911    // assert that the region memstore is empty
912    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
913    assertTrue(newRegionMemstoreSize == 0);
914
915    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
916                                                         // any
917
918    LOG.info("-- Verifying edits from secondary");
919    verifyData(secondaryRegion, 0, numRows, cq, families);
920
921    LOG.info("-- Verifying edits from primary.");
922    verifyData(primaryRegion, 0, numRows, cq, families);
923  }
924
925  /**
926   * Tests the case where we replay a region open event after a flush start but before receiving
927   * flush commit
928   */
929  @Test
930  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
931    putDataWithFlushes(primaryRegion, 100, 100, 100);
932    int numRows = 200;
933
934    // close the region and open again.
935    primaryRegion.close();
936    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
937
938    // now replay the edits and the flush marker
939    reader = createWALReaderForPrimary();
940    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
941
942    LOG.info("-- Replaying edits and region events in secondary");
943    while (true) {
944      WAL.Entry entry = reader.next();
945      if (entry == null) {
946        break;
947      }
948      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
949      RegionEventDescriptor regionEventDesc =
950        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
951
952      if (flushDesc != null) {
953        // only replay flush start
954        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
955          secondaryRegion.replayWALFlushStartMarker(flushDesc);
956        }
957      } else if (regionEventDesc != null) {
958        regionEvents.add(regionEventDesc);
959      } else {
960        replayEdit(secondaryRegion, entry);
961      }
962    }
963
964    // at this point, there should be some data (rows 0-100) in the memstore snapshot
965    // and some more data in memstores (rows 100-200)
966    verifyData(secondaryRegion, 0, numRows, cq, families);
967
968    // we should have 1 open, 1 close and 1 open event
969    assertEquals(3, regionEvents.size());
970
971    // no store files in the region
972    int expectedStoreFileCount = 0;
973    for (HStore s : secondaryRegion.getStores()) {
974      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
975    }
976
977    // now replay the region open event that should contain new file locations
978    LOG.info("Testing replaying region open event " + regionEvents.get(2));
979    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
980
981    // assert that the flush files are picked
982    expectedStoreFileCount = 2; // two flushes happened
983    for (HStore s : secondaryRegion.getStores()) {
984      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
985    }
986    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
987    MemStoreSize newSnapshotSize = store.getSnapshotSize();
988    assertTrue(newSnapshotSize.getDataSize() == 0);
989
990    // assert that the region memstore is empty
991    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
992    assertTrue(newRegionMemstoreSize == 0);
993
994    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
995                                                         // any
996
997    LOG.info("-- Verifying edits from secondary");
998    verifyData(secondaryRegion, 0, numRows, cq, families);
999
1000    LOG.info("-- Verifying edits from primary.");
1001    verifyData(primaryRegion, 0, numRows, cq, families);
1002  }
1003
1004  /**
1005   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1006   * of the last replayed region open event.
1007   */
1008  @Test
1009  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1010    putDataWithFlushes(primaryRegion, 100, 100, 0);
1011    int numRows = 100;
1012
1013    // close the region and open again.
1014    primaryRegion.close();
1015    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1016
1017    // now replay the edits and the flush marker
1018    reader = createWALReaderForPrimary();
1019    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1020    List<WAL.Entry> edits = Lists.newArrayList();
1021
1022    LOG.info("-- Replaying edits and region events in secondary");
1023    while (true) {
1024      WAL.Entry entry = reader.next();
1025      if (entry == null) {
1026        break;
1027      }
1028      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1029      RegionEventDescriptor regionEventDesc =
1030        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1031
1032      if (flushDesc != null) {
1033        // don't replay flushes
1034      } else if (regionEventDesc != null) {
1035        regionEvents.add(regionEventDesc);
1036      } else {
1037        edits.add(entry);
1038      }
1039    }
1040
1041    // replay the region open of first open, but with the seqid of the second open
1042    // this way non of the flush files will be picked up.
1043    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0))
1044      .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build());
1045
1046    // replay edits from the before region close. If replay does not
1047    // skip these the following verification will NOT fail.
1048    for (WAL.Entry entry : edits) {
1049      replayEdit(secondaryRegion, entry);
1050    }
1051
1052    boolean expectedFail = false;
1053    try {
1054      verifyData(secondaryRegion, 0, numRows, cq, families);
1055    } catch (AssertionError e) {
1056      expectedFail = true; // expected
1057    }
1058    if (!expectedFail) {
1059      fail("Should have failed this verification");
1060    }
1061  }
1062
1063  @Test
1064  public void testReplayFlushSeqIds() throws IOException {
1065    // load some data to primary and flush
1066    int start = 0;
1067    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
1068    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1069    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1070    primaryRegion.flush(true);
1071
1072    // now replay the flush marker
1073    reader = createWALReaderForPrimary();
1074
1075    long flushSeqId = -1;
1076    LOG.info("-- Replaying flush events in secondary");
1077    while (true) {
1078      WAL.Entry entry = reader.next();
1079      if (entry == null) {
1080        break;
1081      }
1082      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1083      if (flushDesc != null) {
1084        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1085          LOG.info("-- Replaying flush start in secondary");
1086          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1087          flushSeqId = flushDesc.getFlushSequenceNumber();
1088        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1089          LOG.info("-- Replaying flush commit in secondary");
1090          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1091          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1092        }
1093      }
1094      // else do not replay
1095    }
1096
1097    // TODO: what to do with this?
1098    // assert that the newly picked up flush file is visible
1099    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1100    assertEquals(flushSeqId, readPoint);
1101
1102    // after replay verify that everything is still visible
1103    verifyData(secondaryRegion, 0, 100, cq, families);
1104  }
1105
1106  @Test
1107  public void testSeqIdsFromReplay() throws IOException {
1108    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1109    // original seqIds and they are made visible through mvcc read point upon replay
1110    String method = name.getMethodName();
1111    byte[] tableName = Bytes.toBytes(method);
1112    byte[] family = Bytes.toBytes("family");
1113
1114    HRegion region = initHRegion(tableName, family);
1115    try {
1116      // replay an entry that is bigger than current read point
1117      long readPoint = region.getMVCC().getReadPoint();
1118      long origSeqId = readPoint + 100;
1119
1120      Put put = new Put(row).addColumn(family, row, row);
1121      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1122      replay(region, put, origSeqId);
1123
1124      // read point should have advanced to this seqId
1125      assertGet(region, family, row);
1126
1127      // region seqId should have advanced at least to this seqId
1128      assertEquals(origSeqId, region.getReadPoint(null));
1129
1130      // replay an entry that is smaller than current read point
1131      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1132      // replay does not allow reads while replay is going on.
1133      put = new Put(row2).addColumn(family, row2, row2);
1134      put.setDurability(Durability.SKIP_WAL);
1135      replay(region, put, origSeqId - 50);
1136
1137      assertGet(region, family, row2);
1138    } finally {
1139      region.close();
1140    }
1141  }
1142
1143  /**
1144   * Tests that a region opened in secondary mode would not write region open / close events to its
1145   * WAL.
1146   */
1147  @Test
1148  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1149    secondaryRegion.close();
1150    walSecondary = spy(walSecondary);
1151
1152    // test for region open and close
1153    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1154    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1155      any(WALEdit.class));
1156
1157    // test for replay prepare flush
1158    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1159    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder()
1160      .setFlushSequenceNumber(10)
1161      .setTableName(UnsafeByteOperations
1162        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1163      .setAction(FlushAction.START_FLUSH)
1164      .setEncodedRegionName(
1165        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1166      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1167      .build());
1168
1169    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1170      any(WALEdit.class));
1171
1172    secondaryRegion.close();
1173    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1174      any(WALEdit.class));
1175  }
1176
1177  /**
1178   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1179   */
1180  @Test
1181  public void testRegionReadsEnabledFlag() throws IOException {
1182
1183    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1184
1185    verifyData(secondaryRegion, 0, 100, cq, families);
1186
1187    // now disable reads
1188    secondaryRegion.setReadsEnabled(false);
1189    try {
1190      verifyData(secondaryRegion, 0, 100, cq, families);
1191      fail("Should have failed with IOException");
1192    } catch (IOException ex) {
1193      // expected
1194    }
1195
1196    // verify that we can still replay data
1197    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1198
1199    // now enable reads again
1200    secondaryRegion.setReadsEnabled(true);
1201    verifyData(secondaryRegion, 0, 200, cq, families);
1202  }
1203
1204  /**
1205   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1206   * It should write the flush request marker instead.
1207   */
1208  @Test
1209  public void testWriteFlushRequestMarker() throws IOException {
1210    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1211    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1212    assertNotNull(result);
1213    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1214    assertFalse(result.wroteFlushWalMarker);
1215
1216    // request flush again, but this time with writeFlushRequestWalMarker = true
1217    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1218    assertNotNull(result);
1219    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1220    assertTrue(result.wroteFlushWalMarker);
1221
1222    List<FlushDescriptor> flushes = Lists.newArrayList();
1223    reader = createWALReaderForPrimary();
1224    while (true) {
1225      WAL.Entry entry = reader.next();
1226      if (entry == null) {
1227        break;
1228      }
1229      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1230      if (flush != null) {
1231        flushes.add(flush);
1232      }
1233    }
1234
1235    assertEquals(1, flushes.size());
1236    assertNotNull(flushes.get(0));
1237    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1238  }
1239
1240  /**
1241   * Test the case where the secondary region replica is not in reads enabled state because it is
1242   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush
1243   * marker entry should restore the reads enabled status in the region and allow the reads to
1244   * continue.
1245   */
1246  @Test
1247  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1248    disableReads(secondaryRegion);
1249
1250    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1251    // triggered flush restores readsEnabled
1252    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1253    reader = createWALReaderForPrimary();
1254    while (true) {
1255      WAL.Entry entry = reader.next();
1256      if (entry == null) {
1257        break;
1258      }
1259      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1260      if (flush != null) {
1261        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1262      }
1263    }
1264
1265    // now reads should be enabled
1266    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1267  }
1268
1269  /**
1270   * Test the case where the secondary region replica is not in reads enabled state because it is
1271   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1272   * entries should restore the reads enabled status in the region and allow the reads to continue.
1273   */
1274  @Test
1275  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1276    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1277    // from triggered flush restores readsEnabled
1278    disableReads(secondaryRegion);
1279
1280    // put some data in primary
1281    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1282    primaryRegion.flush(true);
1283    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1284    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1285    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1286    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1287    // but can't figure it... and this is only test that seems to suffer this flush issue.
1288    // St.Ack 20160201
1289    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1290
1291    reader = createWALReaderForPrimary();
1292    while (true) {
1293      WAL.Entry entry = reader.next();
1294      LOG.info(Objects.toString(entry));
1295      if (entry == null) {
1296        break;
1297      }
1298      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1299      if (flush != null) {
1300        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1301      } else {
1302        replayEdit(secondaryRegion, entry);
1303      }
1304    }
1305
1306    // now reads should be enabled
1307    verifyData(secondaryRegion, 0, 100, cq, families);
1308  }
1309
1310  /**
1311   * Test the case where the secondary region replica is not in reads enabled state because it is
1312   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1313   * entries should restore the reads enabled status in the region and allow the reads to continue.
1314   */
1315  @Test
1316  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1317    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1318    // from triggered flush restores readsEnabled
1319    disableReads(secondaryRegion);
1320
1321    // put some data in primary
1322    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1323    primaryRegion.flush(true);
1324
1325    reader = createWALReaderForPrimary();
1326    while (true) {
1327      WAL.Entry entry = reader.next();
1328      if (entry == null) {
1329        break;
1330      }
1331      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1332      if (flush != null) {
1333        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1334      }
1335    }
1336
1337    // now reads should be enabled
1338    verifyData(secondaryRegion, 0, 100, cq, families);
1339  }
1340
1341  /**
1342   * Test the case where the secondary region replica is not in reads enabled state because it is
1343   * waiting for a flush or region open marker from primary region. Replaying region open event
1344   * entry from primary should restore the reads enabled status in the region and allow the reads to
1345   * continue.
1346   */
1347  @Test
1348  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1349    // Test case 3: Test that replaying region open event markers restores readsEnabled
1350    disableReads(secondaryRegion);
1351
1352    primaryRegion.close();
1353    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1354
1355    reader = createWALReaderForPrimary();
1356    while (true) {
1357      WAL.Entry entry = reader.next();
1358      if (entry == null) {
1359        break;
1360      }
1361
1362      RegionEventDescriptor regionEventDesc =
1363        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1364
1365      if (regionEventDesc != null) {
1366        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1367      }
1368    }
1369
1370    // now reads should be enabled
1371    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1372  }
1373
1374  @Test
1375  public void testRefresStoreFiles() throws IOException {
1376    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1377    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1378
1379    // Test case 1: refresh with an empty region
1380    secondaryRegion.refreshStoreFiles();
1381    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1382
1383    // do one flush
1384    putDataWithFlushes(primaryRegion, 100, 100, 0);
1385    int numRows = 100;
1386
1387    // refresh the store file list, and ensure that the files are picked up.
1388    secondaryRegion.refreshStoreFiles();
1389    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1390      secondaryRegion.getStoreFileList(families));
1391    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1392
1393    LOG.info("-- Verifying edits from secondary");
1394    verifyData(secondaryRegion, 0, numRows, cq, families);
1395
1396    // Test case 2: 3 some more flushes
1397    putDataWithFlushes(primaryRegion, 100, 300, 0);
1398    numRows = 300;
1399
1400    // refresh the store file list, and ensure that the files are picked up.
1401    secondaryRegion.refreshStoreFiles();
1402    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1403      secondaryRegion.getStoreFileList(families));
1404    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1405
1406    LOG.info("-- Verifying edits from secondary");
1407    verifyData(secondaryRegion, 0, numRows, cq, families);
1408
1409    if (FSUtils.WINDOWS) {
1410      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1411      return;
1412    }
1413
1414    // Test case 3: compact primary files
1415    primaryRegion.compactStores();
1416    List<HRegion> regions = new ArrayList<>();
1417    regions.add(primaryRegion);
1418    Mockito.doReturn(regions).when(rss).getRegions();
1419    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1420    cleaner.chore();
1421    secondaryRegion.refreshStoreFiles();
1422    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1423      secondaryRegion.getStoreFileList(families));
1424    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1425
1426    LOG.info("-- Verifying edits from secondary");
1427    verifyData(secondaryRegion, 0, numRows, cq, families);
1428
1429    LOG.info("-- Replaying edits in secondary");
1430
1431    // Test case 4: replay some edits, ensure that memstore is dropped.
1432    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1433    putDataWithFlushes(primaryRegion, 400, 400, 0);
1434    numRows = 400;
1435
1436    reader = createWALReaderForPrimary();
1437    while (true) {
1438      WAL.Entry entry = reader.next();
1439      if (entry == null) {
1440        break;
1441      }
1442      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1443      if (flush != null) {
1444        // do not replay flush
1445      } else {
1446        replayEdit(secondaryRegion, entry);
1447      }
1448    }
1449
1450    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1451
1452    secondaryRegion.refreshStoreFiles();
1453
1454    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1455
1456    LOG.info("-- Verifying edits from primary");
1457    verifyData(primaryRegion, 0, numRows, cq, families);
1458    LOG.info("-- Verifying edits from secondary");
1459    verifyData(secondaryRegion, 0, numRows, cq, families);
1460  }
1461
1462  /**
1463   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1464   */
1465  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1466    List<Path> l1 = new ArrayList<>(list1.size());
1467    for (String path : list1) {
1468      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1469    }
1470    List<Path> l2 = new ArrayList<>(list2.size());
1471    for (String path : list2) {
1472      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1473    }
1474    assertEquals(l1, l2);
1475  }
1476
1477  private void disableReads(HRegion region) {
1478    region.setReadsEnabled(false);
1479    try {
1480      verifyData(region, 0, 1, cq, families);
1481      fail("Should have failed with IOException");
1482    } catch (IOException ex) {
1483      // expected
1484    }
1485  }
1486
1487  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1488    put.setDurability(Durability.SKIP_WAL);
1489    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1490    region.batchReplay(new MutationReplay[] { mutation }, replaySeqId);
1491  }
1492
1493  /**
1494   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1495   */
1496  @Test
1497  public void testReplayBulkLoadEvent() throws IOException {
1498    LOG.info("testReplayBulkLoadEvent starts");
1499    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1500
1501    // close the region and open again.
1502    primaryRegion.close();
1503    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1504
1505    // bulk load a file into primary region
1506    byte[] randomValues = new byte[20];
1507    Bytes.random(randomValues);
1508    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1509
1510    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1511    int expectedLoadFileCount = 0;
1512    for (byte[] family : families) {
1513      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1514      expectedLoadFileCount++;
1515    }
1516    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1517
1518    // now replay the edits and the bulk load marker
1519    reader = createWALReaderForPrimary();
1520
1521    LOG.info("-- Replaying edits and region events in secondary");
1522    BulkLoadDescriptor bulkloadEvent = null;
1523    while (true) {
1524      WAL.Entry entry = reader.next();
1525      if (entry == null) {
1526        break;
1527      }
1528      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1529      if (bulkloadEvent != null) {
1530        break;
1531      }
1532    }
1533
1534    // we should have 1 bulk load event
1535    assertTrue(bulkloadEvent != null);
1536    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1537
1538    // replay the bulk load event
1539    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1540
1541    List<String> storeFileName = new ArrayList<>();
1542    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1543      storeFileName.addAll(storeDesc.getStoreFileList());
1544    }
1545    // assert that the bulk loaded files are picked
1546    for (HStore s : secondaryRegion.getStores()) {
1547      for (HStoreFile sf : s.getStorefiles()) {
1548        storeFileName.remove(sf.getPath().getName());
1549      }
1550    }
1551    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1552
1553    LOG.info("-- Verifying edits from secondary");
1554    for (byte[] family : families) {
1555      assertGet(secondaryRegion, family, randomValues);
1556    }
1557  }
1558
1559  @Test
1560  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1561    // tests replaying flush commit marker, but the flush file has already been compacted
1562    // from primary and also deleted from the archive directory
1563    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder()
1564      .setFlushSequenceNumber(Long.MAX_VALUE)
1565      .setTableName(UnsafeByteOperations
1566        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1567      .setAction(FlushAction.COMMIT_FLUSH)
1568      .setEncodedRegionName(
1569        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1570      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1571      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1572        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1573        .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build())
1574      .build());
1575  }
1576
1577  @Test
1578  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1579    // tests replaying compaction marker, but the compaction output file has already been compacted
1580    // from primary and also deleted from the archive directory
1581    secondaryRegion
1582      .replayWALCompactionMarker(
1583        CompactionDescriptor.newBuilder()
1584          .setTableName(UnsafeByteOperations
1585            .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1586          .setEncodedRegionName(
1587            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1588          .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123")
1589          .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir")
1590          .setRegionName(
1591            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1592          .build(),
1593        true, true, Long.MAX_VALUE);
1594  }
1595
1596  @Test
1597  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1598    // tests replaying region open event marker, but the region files have already been compacted
1599    // from primary and also deleted from the archive directory
1600    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1601      .setTableName(UnsafeByteOperations
1602        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1603      .setEncodedRegionName(
1604        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1605      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1606      .setEventType(EventType.REGION_OPEN)
1607      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1608      .setLogSequenceNumber(Long.MAX_VALUE)
1609      .addStores(
1610        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1611          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1612      .build());
1613  }
1614
1615  @Test
1616  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1617    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1618    // from primary and also deleted from the archive directory
1619    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1620      .setTableName(
1621        ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1622      .setEncodedRegionName(
1623        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1624      .setBulkloadSeqNum(Long.MAX_VALUE)
1625      .addStores(
1626        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1627          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1628      .build());
1629  }
1630
1631  private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes)
1632    throws IOException {
1633    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1634    // TODO We need a way to do this without creating files
1635    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1636    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1637    try {
1638      hFileFactory.withOutputStream(out);
1639      hFileFactory.withFileContext(new HFileContextBuilder().build());
1640      HFile.Writer writer = hFileFactory.create();
1641      try {
1642        writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
1643          .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L)
1644          .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build()));
1645      } finally {
1646        writer.close();
1647      }
1648    } finally {
1649      out.close();
1650    }
1651    return testFile.toString();
1652  }
1653
1654  /**
1655   * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush
1656   * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but
1657   * does not execute flush after
1658   */
1659  private void putDataWithFlushes(HRegion region, int flushInterval, int numRows,
1660    int numRowsAfterFlush) throws IOException {
1661    int start = 0;
1662    for (; start < numRows; start += flushInterval) {
1663      LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval));
1664      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1665      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1666      region.flush(true);
1667    }
1668    LOG.info("-- Writing some more data to primary, not flushing");
1669    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1670  }
1671
1672  private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf,
1673    byte[]... families) throws IOException {
1674    for (int i = startRow; i < startRow + numRows; i++) {
1675      Put put = new Put(Bytes.toBytes("" + i));
1676      put.setDurability(Durability.SKIP_WAL);
1677      for (byte[] family : families) {
1678        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1679      }
1680      replay(region, put, i + 1);
1681    }
1682  }
1683
1684  private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
1685    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
1686      HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families);
1687  }
1688}