001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyInt;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.when;
030
031import java.io.FilterInputStream;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.HashSet;
039import java.util.List;
040import java.util.NavigableMap;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.function.Consumer;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionInfo;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.MiniHBaseCluster;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
071import org.apache.hadoop.hbase.monitoring.MonitoredTask;
072import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
073import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
074import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
075import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
076import org.apache.hadoop.hbase.regionserver.FlushRequester;
077import org.apache.hadoop.hbase.regionserver.HRegion;
078import org.apache.hadoop.hbase.regionserver.HRegionServer;
079import org.apache.hadoop.hbase.regionserver.HStore;
080import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
081import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
082import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
083import org.apache.hadoop.hbase.regionserver.Region;
084import org.apache.hadoop.hbase.regionserver.RegionScanner;
085import org.apache.hadoop.hbase.regionserver.RegionServerServices;
086import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
087import org.apache.hadoop.hbase.security.User;
088import org.apache.hadoop.hbase.util.Bytes;
089import org.apache.hadoop.hbase.util.CommonFSUtils;
090import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
091import org.apache.hadoop.hbase.util.EnvironmentEdge;
092import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
093import org.apache.hadoop.hbase.util.HFileTestUtil;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
096import org.apache.hadoop.hbase.wal.WAL;
097import org.apache.hadoop.hbase.wal.WALEdit;
098import org.apache.hadoop.hbase.wal.WALFactory;
099import org.apache.hadoop.hbase.wal.WALKeyImpl;
100import org.apache.hadoop.hbase.wal.WALSplitUtil;
101import org.apache.hadoop.hbase.wal.WALSplitter;
102import org.apache.hadoop.hbase.wal.WALStreamReader;
103import org.apache.hadoop.hdfs.DFSInputStream;
104import org.junit.After;
105import org.junit.AfterClass;
106import org.junit.Before;
107import org.junit.BeforeClass;
108import org.junit.Rule;
109import org.junit.Test;
110import org.junit.rules.TestName;
111import org.mockito.Mockito;
112import org.mockito.invocation.InvocationOnMock;
113import org.mockito.stubbing.Answer;
114import org.slf4j.Logger;
115import org.slf4j.LoggerFactory;
116
117/**
118 * Test replay of edits out of a WAL split.
119 */
120public abstract class AbstractTestWALReplay {
121  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
122  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
123  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
124  private Path hbaseRootDir = null;
125  private String logName;
126  private Path oldLogDir;
127  private Path logDir;
128  private FileSystem fs;
129  private Configuration conf;
130  private WALFactory wals;
131
132  @Rule
133  public final TestName currentTest = new TestName();
134
135  @BeforeClass
136  public static void setUpBeforeClass() throws Exception {
137    Configuration conf = TEST_UTIL.getConfiguration();
138    // The below config supported by 0.20-append and CDH3b2
139    conf.setInt("dfs.client.block.recovery.retries", 2);
140    TEST_UTIL.startMiniCluster(3);
141    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
142    LOG.info("hbase.rootdir=" + hbaseRootDir);
143    CommonFSUtils.setRootDir(conf, hbaseRootDir);
144  }
145
146  @AfterClass
147  public static void tearDownAfterClass() throws Exception {
148    TEST_UTIL.shutdownMiniCluster();
149  }
150
151  @Before
152  public void setUp() throws Exception {
153    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
154    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
155    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
156    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
157    String serverName = ServerName
158      .valueOf(currentTest.getMethodName() + "-manual", 16010, ee.currentTime()).toString();
159    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
160    this.logDir = new Path(this.hbaseRootDir, logName);
161    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
162      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
163    }
164    this.wals = new WALFactory(conf, currentTest.getMethodName());
165  }
166
167  @After
168  public void tearDown() throws Exception {
169    this.wals.close();
170    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
171  }
172
173  /*
174   * @param p Directory to cleanup
175   */
176  private void deleteDir(final Path p) throws IOException {
177    if (this.fs.exists(p)) {
178      if (!this.fs.delete(p, true)) {
179        throw new IOException("Failed remove of " + p);
180      }
181    }
182  }
183
184  /**
185   *   */
186  @Test
187  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
188    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
189    byte[] family1 = Bytes.toBytes("cf1");
190    byte[] family2 = Bytes.toBytes("cf2");
191    byte[] qualifier = Bytes.toBytes("q");
192    byte[] value = Bytes.toBytes("testV");
193    byte[][] familys = { family1, family2 };
194    TEST_UTIL.createTable(tableName, familys);
195    Table htable = TEST_UTIL.getConnection().getTable(tableName);
196    Put put = new Put(Bytes.toBytes("r1"));
197    put.addColumn(family1, qualifier, value);
198    htable.put(put);
199    ResultScanner resultScanner = htable.getScanner(new Scan());
200    int count = 0;
201    while (resultScanner.next() != null) {
202      count++;
203    }
204    resultScanner.close();
205    assertEquals(1, count);
206
207    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
208    List<HRegion> regions = hbaseCluster.getRegions(tableName);
209    assertEquals(1, regions.size());
210
211    // move region to another regionserver
212    Region destRegion = regions.get(0);
213    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
214    assertTrue("Please start more than 1 regionserver",
215      hbaseCluster.getRegionServerThreads().size() > 1);
216    int destServerNum = 0;
217    while (destServerNum == originServerNum) {
218      destServerNum++;
219    }
220    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
221    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
222    // move region to destination regionserver
223    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
224
225    // delete the row
226    Delete del = new Delete(Bytes.toBytes("r1"));
227    htable.delete(del);
228    resultScanner = htable.getScanner(new Scan());
229    count = 0;
230    while (resultScanner.next() != null) {
231      count++;
232    }
233    resultScanner.close();
234    assertEquals(0, count);
235
236    // flush region and make major compaction
237    HRegion region =
238      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
239    region.flush(true);
240    // wait to complete major compaction
241    for (HStore store : region.getStores()) {
242      store.triggerMajorCompaction();
243    }
244    region.compact(true);
245
246    // move region to origin regionserver
247    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
248    // abort the origin regionserver
249    originServer.abort("testing");
250
251    // see what we get
252    Result result = htable.get(new Get(Bytes.toBytes("r1")));
253    if (result != null) {
254      assertTrue("Row is deleted, but we get" + result.toString(),
255        (result == null) || result.isEmpty());
256    }
257    resultScanner.close();
258  }
259
260  /**
261   * Tests for hbase-2727.
262   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
263   */
264  @Test
265  public void test2727() throws Exception {
266    // Test being able to have > 1 set of edits in the recovered.edits directory.
267    // Ensure edits are replayed properly.
268    final TableName tableName = TableName.valueOf("test2727");
269
270    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
271    HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
272    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
273    deleteDir(basedir);
274
275    HTableDescriptor htd = createBasic3FamilyHTD(tableName);
276    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
277    HBaseTestingUtility.closeRegionAndWAL(region2);
278    final byte[] rowName = tableName.getName();
279
280    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
281    // Add 1k to each family.
282    final int countPerFamily = 1000;
283
284    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
285    for (byte[] fam : htd.getFamiliesKeys()) {
286      scopes.put(fam, 0);
287    }
288    for (HColumnDescriptor hcd : htd.getFamilies()) {
289      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1, htd, mvcc,
290        scopes);
291    }
292    wal1.shutdown();
293    runWALSplit(this.conf);
294
295    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
296    // Add 1k to each family.
297    for (HColumnDescriptor hcd : htd.getFamilies()) {
298      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2, htd, mvcc,
299        scopes);
300    }
301    wal2.shutdown();
302    runWALSplit(this.conf);
303
304    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
305    try {
306      HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
307      long seqid = region.getOpenSeqNum();
308      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
309      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
310      assertTrue(seqid > mvcc.getWritePoint());
311      assertEquals(seqid - 1, mvcc.getWritePoint());
312      LOG.debug(
313        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
314
315      // TODO: Scan all.
316      region.close();
317    } finally {
318      wal3.close();
319    }
320  }
321
322  /**
323   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
324   */
325  @Test
326  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
327    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
328    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
329    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
330    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
331    deleteDir(basedir);
332    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
333    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
334    HBaseTestingUtility.closeRegionAndWAL(region2);
335    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
336    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
337
338    byte[] family = htd.getFamilies().iterator().next().getName();
339    Path f = new Path(basedir, "hfile");
340    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
341      Bytes.toBytes("z"), 10);
342    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
343    hfs.add(Pair.newPair(family, f.toString()));
344    region.bulkLoadHFiles(hfs, true, null);
345
346    // Add an edit so something in the WAL
347    byte[] row = tableName.getName();
348    region.put((new Put(row)).addColumn(family, family, family));
349    wal.sync();
350    final int rowsInsertedCount = 11;
351
352    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
353
354    // Now 'crash' the region by stealing its wal
355    final Configuration newConf = HBaseConfiguration.create(this.conf);
356    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
357    user.runAs(new PrivilegedExceptionAction() {
358      @Override
359      public Object run() throws Exception {
360        runWALSplit(newConf);
361        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
362
363        HRegion region2 =
364          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
365        long seqid2 = region2.getOpenSeqNum();
366        assertTrue(seqid2 > -1);
367        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
368
369        // I can't close wal1. Its been appropriated when we split.
370        region2.close();
371        wal2.close();
372        return null;
373      }
374    });
375  }
376
377  /**
378   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
379   * files) and an edit in the memstore. This is for HBASE-10958 "[dataloss] Bulk loading with
380   * seqids can prevent some log entries from being replayed"
381   */
382  @Test
383  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
384    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
385    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
386    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
387    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
388    deleteDir(basedir);
389    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
390    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
391    HBaseTestingUtility.closeRegionAndWAL(region2);
392    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
393    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
394
395    // Add an edit so something in the WAL
396    byte[] row = tableName.getName();
397    byte[] family = htd.getFamilies().iterator().next().getName();
398    region.put((new Put(row)).addColumn(family, family, family));
399    wal.sync();
400
401    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
402    for (int i = 0; i < 3; i++) {
403      Path f = new Path(basedir, "hfile" + i);
404      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
405        Bytes.toBytes(i + "50"), 10);
406      hfs.add(Pair.newPair(family, f.toString()));
407    }
408    region.bulkLoadHFiles(hfs, true, null);
409    final int rowsInsertedCount = 31;
410    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
411
412    // major compact to turn all the bulk loaded files into one normal file
413    region.compact(true);
414    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
415
416    // Now 'crash' the region by stealing its wal
417    final Configuration newConf = HBaseConfiguration.create(this.conf);
418    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
419    user.runAs(new PrivilegedExceptionAction() {
420      @Override
421      public Object run() throws Exception {
422        runWALSplit(newConf);
423        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
424
425        HRegion region2 =
426          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
427        long seqid2 = region2.getOpenSeqNum();
428        assertTrue(seqid2 > -1);
429        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
430
431        // I can't close wal1. Its been appropriated when we split.
432        region2.close();
433        wal2.close();
434        return null;
435      }
436    });
437  }
438
439  /**
440   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
441   * seqids.
442   */
443  @Test
444  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
445    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
446    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
447    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
448    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
449    deleteDir(basedir);
450    final byte[] rowName = tableName.getName();
451    final int countPerFamily = 10;
452    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
453    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
454    HBaseTestingUtility.closeRegionAndWAL(region3);
455    // Write countPerFamily edits into the three families. Do a flush on one
456    // of the families during the load of edits so its seqid is not same as
457    // others to test we do right thing when different seqids.
458    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
459    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
460    long seqid = region.getOpenSeqNum();
461    boolean first = true;
462    for (HColumnDescriptor hcd : htd.getFamilies()) {
463      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
464      if (first) {
465        // If first, so we have at least one family w/ different seqid to rest.
466        region.flush(true);
467        first = false;
468      }
469    }
470    // Now assert edits made it in.
471    final Get g = new Get(rowName);
472    Result result = region.get(g);
473    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
474    // Now close the region (without flush), split the log, reopen the region and assert that
475    // replay of log has the correct effect, that our seqids are calculated correctly so
476    // all edits in logs are seen as 'stale'/old.
477    region.close(true);
478    wal.shutdown();
479    runWALSplit(this.conf);
480    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
481    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
482    long seqid2 = region2.getOpenSeqNum();
483    assertTrue(seqid + result.size() < seqid2);
484    final Result result1b = region2.get(g);
485    assertEquals(result.size(), result1b.size());
486
487    // Next test. Add more edits, then 'crash' this region by stealing its wal
488    // out from under it and assert that replay of the log adds the edits back
489    // correctly when region is opened again.
490    for (HColumnDescriptor hcd : htd.getFamilies()) {
491      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
492    }
493    // Get count of edits.
494    final Result result2 = region2.get(g);
495    assertEquals(2 * result.size(), result2.size());
496    wal2.sync();
497    final Configuration newConf = HBaseConfiguration.create(this.conf);
498    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
499    user.runAs(new PrivilegedExceptionAction<Object>() {
500      @Override
501      public Object run() throws Exception {
502        runWALSplit(newConf);
503        FileSystem newFS = FileSystem.get(newConf);
504        // Make a new wal for new region open.
505        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
506        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
507        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
508          @Override
509          protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
510            super.restoreEdit(s, cell, memstoreSizing);
511            countOfRestoredEdits.incrementAndGet();
512          }
513        };
514        long seqid3 = region3.initialize();
515        Result result3 = region3.get(g);
516        // Assert that count of cells is same as before crash.
517        assertEquals(result2.size(), result3.size());
518        assertEquals(htd.getFamilies().size() * countPerFamily, countOfRestoredEdits.get());
519
520        // I can't close wal1. Its been appropriated when we split.
521        region3.close();
522        wal3.close();
523        return null;
524      }
525    });
526  }
527
528  /**
529   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
530   * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level.
531   * The way we get around this is by flushing at the region level, and then deleting the recently
532   * flushed store file for one of the Stores. This would put us back in the situation where all but
533   * that store got flushed and the region died. We restart Region again, and verify that the edits
534   * were replayed.
535   */
536  @Test
537  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
538    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
539    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
540    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
541    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
542    deleteDir(basedir);
543    final byte[] rowName = tableName.getName();
544    final int countPerFamily = 10;
545    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
546    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
547    HBaseTestingUtility.closeRegionAndWAL(region3);
548    // Write countPerFamily edits into the three families. Do a flush on one
549    // of the families during the load of edits so its seqid is not same as
550    // others to test we do right thing when different seqids.
551    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
552    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
553    long seqid = region.getOpenSeqNum();
554    for (HColumnDescriptor hcd : htd.getFamilies()) {
555      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
556    }
557
558    // Now assert edits made it in.
559    final Get g = new Get(rowName);
560    Result result = region.get(g);
561    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
562
563    // Let us flush the region
564    region.flush(true);
565    region.close(true);
566    wal.shutdown();
567
568    // delete the store files in the second column family to simulate a failure
569    // in between the flushcache();
570    // we have 3 families. killing the middle one ensures that taking the maximum
571    // will make us fail.
572    int cf_count = 0;
573    for (HColumnDescriptor hcd : htd.getFamilies()) {
574      cf_count++;
575      if (cf_count == 2) {
576        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
577      }
578    }
579
580    // Let us try to split and recover
581    runWALSplit(this.conf);
582    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
583    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
584    long seqid2 = region2.getOpenSeqNum();
585    assertTrue(seqid + result.size() < seqid2);
586
587    final Result result1b = region2.get(g);
588    assertEquals(result.size(), result1b.size());
589  }
590
591  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
592  // Only throws exception if throwExceptionWhenFlushing is set true.
593  public static class CustomStoreFlusher extends DefaultStoreFlusher {
594    // Switch between throw and not throw exception in flush
595    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
596
597    public CustomStoreFlusher(Configuration conf, HStore store) {
598      super(conf, store);
599    }
600
601    @Override
602    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
603      MonitoredTask status, ThroughputController throughputController,
604      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
605      if (throwExceptionWhenFlushing.get()) {
606        throw new IOException("Simulated exception by tests");
607      }
608      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
609        writerCreationTracker);
610    }
611  };
612
613  /**
614   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
615   * flush after writing some data, then writing more data and flush again, at last verify the data.
616   */
617  @Test
618  public void testReplayEditsAfterAbortingFlush() throws IOException {
619    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
620    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
621    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
622    deleteDir(basedir);
623    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
624    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
625    HBaseTestingUtility.closeRegionAndWAL(region3);
626    // Write countPerFamily edits into the three families. Do a flush on one
627    // of the families during the load of edits so its seqid is not same as
628    // others to test we do right thing when different seqids.
629    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
630    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
631    Mockito.doReturn(false).when(rsServices).isAborted();
632    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
633    when(rsServices.getConfiguration()).thenReturn(conf);
634    Configuration customConf = new Configuration(this.conf);
635    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
636      CustomStoreFlusher.class.getName());
637    HRegion region =
638      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
639    int writtenRowCount = 10;
640    List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies());
641    for (int i = 0; i < writtenRowCount; i++) {
642      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
643      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
644        Bytes.toBytes("val"));
645      region.put(put);
646    }
647
648    // Now assert edits made it in.
649    RegionScanner scanner = region.getScanner(new Scan());
650    assertEquals(writtenRowCount, getScannedCount(scanner));
651
652    // Let us flush the region
653    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
654    try {
655      region.flush(true);
656      fail("Injected exception hasn't been thrown");
657    } catch (IOException e) {
658      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
659      // simulated to abort server
660      Mockito.doReturn(true).when(rsServices).isAborted();
661      region.setClosing(false); // region normally does not accept writes after
662      // DroppedSnapshotException. We mock around it for this test.
663    }
664    // writing more data
665    int moreRow = 10;
666    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
667      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
668      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
669        Bytes.toBytes("val"));
670      region.put(put);
671    }
672    writtenRowCount += moreRow;
673    // call flush again
674    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
675    try {
676      region.flush(true);
677    } catch (IOException t) {
678      LOG.info(
679        "Expected exception when flushing region because server is stopped," + t.getMessage());
680    }
681
682    region.close(true);
683    wal.shutdown();
684
685    // Let us try to split and recover
686    runWALSplit(this.conf);
687    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
688    Mockito.doReturn(false).when(rsServices).isAborted();
689    HRegion region2 =
690      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
691    scanner = region2.getScanner(new Scan());
692    assertEquals(writtenRowCount, getScannedCount(scanner));
693  }
694
695  private int getScannedCount(RegionScanner scanner) throws IOException {
696    int scannedCount = 0;
697    List<Cell> results = new ArrayList<>();
698    while (true) {
699      boolean existMore = scanner.next(results);
700      if (!results.isEmpty()) scannedCount++;
701      if (!existMore) break;
702      results.clear();
703    }
704    return scannedCount;
705  }
706
707  /**
708   * Create an HRegion with the result of a WAL split and test we only see the good edits
709   */
710  @Test
711  public void testReplayEditsWrittenIntoWAL() throws Exception {
712    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
713    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
714    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
715    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
716    deleteDir(basedir);
717
718    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
719    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
720    HBaseTestingUtility.closeRegionAndWAL(region2);
721    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
722    final byte[] rowName = tableName.getName();
723    final byte[] regionName = hri.getEncodedNameAsBytes();
724
725    // Add 1k to each family.
726    final int countPerFamily = 1000;
727    Set<byte[]> familyNames = new HashSet<>();
728    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
729    for (byte[] fam : htd.getFamiliesKeys()) {
730      scopes.put(fam, 0);
731    }
732    for (HColumnDescriptor hcd : htd.getFamilies()) {
733      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, htd, mvcc,
734        scopes);
735      familyNames.add(hcd.getName());
736    }
737
738    // Add a cache flush, shouldn't have any effect
739    wal.startCacheFlush(regionName, familyNames);
740    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
741
742    // Add an edit to another family, should be skipped.
743    WALEdit edit = new WALEdit();
744    long now = ee.currentTime();
745    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
746    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
747      edit);
748
749    // Delete the c family to verify deletes make it over.
750    edit = new WALEdit();
751    now = ee.currentTime();
752    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
753    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
754      edit);
755
756    // Sync.
757    wal.sync();
758    // Make a new conf and a new fs for the splitter to run on so we can take
759    // over old wal.
760    final Configuration newConf = HBaseConfiguration.create(this.conf);
761    User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime");
762    user.runAs(new PrivilegedExceptionAction<Void>() {
763      @Override
764      public Void run() throws Exception {
765        runWALSplit(newConf);
766        FileSystem newFS = FileSystem.get(newConf);
767        // 100k seems to make for about 4 flushes during HRegion#initialize.
768        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
769        // Make a new wal for new region.
770        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
771        final AtomicInteger flushcount = new AtomicInteger(0);
772        try {
773          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
774            @Override
775            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
776              final Collection<HStore> storesToFlush, MonitoredTask status,
777              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
778              LOG.info("InternalFlushCache Invoked");
779              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
780                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
781              flushcount.incrementAndGet();
782              return fs;
783            }
784          };
785          // The seq id this region has opened up with
786          long seqid = region.initialize();
787
788          // The mvcc readpoint of from inserting data.
789          long writePoint = mvcc.getWritePoint();
790
791          // We flushed during init.
792          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
793          assertTrue((seqid - 1) == writePoint);
794
795          Get get = new Get(rowName);
796          Result result = region.get(get);
797          // Make sure we only see the good edits
798          assertEquals(countPerFamily * (htd.getFamilies().size() - 1), result.size());
799          region.close();
800        } finally {
801          newWal.close();
802        }
803        return null;
804      }
805    });
806  }
807
808  @Test
809  // the following test is for HBASE-6065
810  public void testSequentialEditLogSeqNum() throws IOException {
811    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
812    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
813    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
814    deleteDir(basedir);
815    final byte[] rowName = tableName.getName();
816    final int countPerFamily = 10;
817    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
818
819    // Mock the WAL
820    MockWAL wal = createMockWAL();
821
822    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
823    for (HColumnDescriptor hcd : htd.getFamilies()) {
824      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
825    }
826
827    // Let us flush the region
828    // But this time completeflushcache is not yet done
829    region.flush(true);
830    for (HColumnDescriptor hcd : htd.getFamilies()) {
831      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
832    }
833    long lastestSeqNumber = region.getReadPoint(null);
834    // get the current seq no
835    wal.doCompleteCacheFlush = true;
836    // allow complete cache flush with the previous seq number got after first
837    // set of edits.
838    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
839    wal.shutdown();
840    FileStatus[] listStatus = wal.getFiles();
841    assertNotNull(listStatus);
842    assertTrue(listStatus.length > 0);
843    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
844      wals, null);
845    FileStatus[] listStatus1 =
846      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
847        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
848          @Override
849          public boolean accept(Path p) {
850            return !WALSplitUtil.isSequenceIdFile(p);
851          }
852        });
853    int editCount = 0;
854    for (FileStatus fileStatus : listStatus1) {
855      editCount = Integer.parseInt(fileStatus.getPath().getName());
856    }
857    // The sequence number should be same
858    assertEquals(
859      "The sequence number of the recoverd.edits and the current edit seq should be same",
860      lastestSeqNumber, editCount);
861  }
862
863  /**
864   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
865   */
866  @Test
867  public void testDatalossWhenInputError() throws Exception {
868    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
869    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
870    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
871    deleteDir(basedir);
872    final byte[] rowName = tableName.getName();
873    final int countPerFamily = 10;
874    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
875    HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
876    Path regionDir = region1.getWALRegionDir();
877    HBaseTestingUtility.closeRegionAndWAL(region1);
878
879    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
880    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
881    for (HColumnDescriptor hcd : htd.getFamilies()) {
882      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
883    }
884    // Now assert edits made it in.
885    final Get g = new Get(rowName);
886    Result result = region.get(g);
887    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
888    // Now close the region (without flush), split the log, reopen the region and assert that
889    // replay of log has the correct effect.
890    region.close(true);
891    wal.shutdown();
892
893    runWALSplit(this.conf);
894
895    // here we let the DFSInputStream throw an IOException just after the WALHeader.
896    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
897    final long headerLength;
898    try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) {
899      headerLength = reader.getPosition();
900    }
901    FileSystem spyFs = spy(this.fs);
902    doAnswer(new Answer<FSDataInputStream>() {
903
904      @Override
905      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
906        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
907        Field field = FilterInputStream.class.getDeclaredField("in");
908        field.setAccessible(true);
909        final DFSInputStream in = (DFSInputStream) field.get(stream);
910        DFSInputStream spyIn = spy(in);
911        doAnswer(new Answer<Integer>() {
912
913          private long pos;
914
915          @Override
916          public Integer answer(InvocationOnMock invocation) throws Throwable {
917            if (pos >= headerLength) {
918              throw new IOException("read over limit");
919            }
920            int b = (Integer) invocation.callRealMethod();
921            if (b > 0) {
922              pos += b;
923            }
924            return b;
925          }
926        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
927        doAnswer(new Answer<Void>() {
928
929          @Override
930          public Void answer(InvocationOnMock invocation) throws Throwable {
931            invocation.callRealMethod();
932            in.close();
933            return null;
934          }
935        }).when(spyIn).close();
936        field.set(stream, spyIn);
937        return stream;
938      }
939    }).when(spyFs).open(eq(editFile));
940
941    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
942    HRegion region2;
943    try {
944      // log replay should fail due to the IOException, otherwise we may lose data.
945      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
946      assertEquals(result.size(), region2.get(g).size());
947    } catch (IOException e) {
948      assertEquals("read over limit", e.getMessage());
949    }
950    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
951    assertEquals(result.size(), region2.get(g).size());
952  }
953
954  /**
955   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
956   */
957  private void testNameConflictWhenSplit(boolean largeFirst)
958    throws IOException, StreamLacksCapabilityException {
959    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
960    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
961    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
962    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
963    deleteDir(basedir);
964
965    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
966    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
967    for (byte[] fam : htd.getFamiliesKeys()) {
968      scopes.put(fam, 0);
969    }
970    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
971    HBaseTestingUtility.closeRegionAndWAL(region);
972    final byte[] family = htd.getColumnFamilies()[0].getName();
973    final byte[] rowName = tableName.getName();
974    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
975    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
976
977    Path largeFile = new Path(logDir, "wal-1");
978    Path smallFile = new Path(logDir, "wal-2");
979    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
980    writerWALFile(smallFile, Arrays.asList(entry2));
981    FileStatus first, second;
982    if (largeFirst) {
983      first = fs.getFileStatus(largeFile);
984      second = fs.getFileStatus(smallFile);
985    } else {
986      first = fs.getFileStatus(smallFile);
987      second = fs.getFileStatus(largeFile);
988    }
989    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
990    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
991    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
992    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
993    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
994    assertEquals(2, region.get(new Get(rowName)).size());
995  }
996
997  @Test
998  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
999    testNameConflictWhenSplit(true);
1000  }
1001
1002  @Test
1003  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1004    testNameConflictWhenSplit(false);
1005  }
1006
1007  static class MockWAL extends FSHLog {
1008    boolean doCompleteCacheFlush = false;
1009
1010    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1011      throws IOException {
1012      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1013    }
1014
1015    @Override
1016    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1017      if (!doCompleteCacheFlush) {
1018        return;
1019      }
1020      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1021    }
1022  }
1023
1024  private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1025    HTableDescriptor htd = new HTableDescriptor(tableName);
1026    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1027    htd.addFamily(a);
1028    return htd;
1029  }
1030
1031  private MockWAL createMockWAL() throws IOException {
1032    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1033    wal.init();
1034    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1035    // long gone.
1036    HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1037    return wal;
1038  }
1039
1040  // Flusher used in this test. Keep count of how often we are called and
1041  // actually run the flush inside here.
1042  static class TestFlusher implements FlushRequester {
1043    private HRegion r;
1044
1045    @Override
1046    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1047      try {
1048        r.flush(false);
1049        return true;
1050      } catch (IOException e) {
1051        throw new RuntimeException("Exception flushing", e);
1052      }
1053    }
1054
1055    @Override
1056    public boolean requestFlush(HRegion region, List<byte[]> families,
1057      FlushLifeCycleTracker tracker) {
1058      return true;
1059    }
1060
1061    @Override
1062    public boolean requestDelayedFlush(HRegion region, long when) {
1063      return true;
1064    }
1065
1066    @Override
1067    public void registerFlushRequestListener(FlushRequestListener listener) {
1068
1069    }
1070
1071    @Override
1072    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1073      return false;
1074    }
1075
1076    @Override
1077    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1078
1079    }
1080  }
1081
1082  private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri,
1083    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1084    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1085  }
1086
1087  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1088    int index) {
1089    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1090    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1091    WALEdit edit = new WALEdit();
1092    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1093    return edit;
1094  }
1095
1096  private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
1097    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1098    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1099    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1100      createWALEdit(rowName, family, ee, index), hri, true, null);
1101    entry.stampRegionSequenceId(mvcc.begin());
1102    return entry;
1103  }
1104
1105  private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1106    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1107    final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
1108    NavigableMap<byte[], Integer> scopes) throws IOException {
1109    for (int j = 0; j < count; j++) {
1110      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1111        createWALEdit(rowName, family, ee, j));
1112    }
1113    wal.sync();
1114  }
1115
1116  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1117    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1118    List<Put> puts = new ArrayList<>();
1119    for (int j = 0; j < count; j++) {
1120      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1121      Put p = new Put(rowName);
1122      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1123      r.put(p);
1124      puts.add(p);
1125    }
1126    return puts;
1127  }
1128
1129  /*
1130   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1131   * 'a','b', and 'c'.
1132   * @param tableName Name of table to use when we create HTableDescriptor.
1133   */
1134  private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1135    return new HRegionInfo(tableName, null, null, false);
1136  }
1137
1138  /*
1139   * Run the split. Verify only single split file made.
1140   * @return The single split file made
1141   */
1142  private Path runWALSplit(final Configuration c) throws IOException {
1143    List<Path> splits =
1144      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1145    // Split should generate only 1 file since there's only 1 region
1146    assertEquals("splits=" + splits, 1, splits.size());
1147    // Make sure the file exists
1148    assertTrue(fs.exists(splits.get(0)));
1149    LOG.info("Split file=" + splits.get(0));
1150    return splits.get(0);
1151  }
1152
1153  private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1154    HTableDescriptor htd = new HTableDescriptor(tableName);
1155    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1156    htd.addFamily(a);
1157    HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1158    htd.addFamily(b);
1159    HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1160    htd.addFamily(c);
1161    return htd;
1162  }
1163
1164  private void writerWALFile(Path file, List<FSWALEntry> entries)
1165    throws IOException, StreamLacksCapabilityException {
1166    fs.mkdirs(file.getParent());
1167    ProtobufLogWriter writer = new ProtobufLogWriter();
1168    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1169      StreamSlowMonitor.create(conf, "testMonitor"));
1170    for (FSWALEntry entry : entries) {
1171      writer.append(entry);
1172    }
1173    writer.sync(false);
1174    writer.close();
1175  }
1176
1177  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1178    throws IOException;
1179}