001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertThrows;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.io.InputStream;
031import java.lang.reflect.Method;
032import java.net.BindException;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.NavigableMap;
036import java.util.TreeMap;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.stream.Collectors;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.Coprocessor;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.RegionInfoBuilder;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.codec.Codec;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
063import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
066import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.RegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
073import org.apache.hadoop.hbase.util.Threads;
074import org.apache.hadoop.hbase.wal.WALFactory.Providers;
075import org.apache.hadoop.hdfs.DistributedFileSystem;
076import org.apache.hadoop.hdfs.MiniDFSCluster;
077import org.apache.hadoop.hdfs.protocol.HdfsConstants;
078import org.junit.After;
079import org.junit.AfterClass;
080import org.junit.Before;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090/**
091 * WAL tests that can be reused across providers.
092 */
093@Category({ RegionServerTests.class, MediumTests.class })
094public class TestWALFactory {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestWALFactory.class);
099
100  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
101
102  protected static Configuration conf;
103  private static MiniDFSCluster cluster;
104  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
105  protected static Path hbaseDir;
106  protected static Path hbaseWALDir;
107
108  protected FileSystem fs;
109  protected Path dir;
110  protected WALFactory wals;
111  private ServerName currentServername;
112
113  @Rule
114  public final TestName currentTest = new TestName();
115
116  @Before
117  public void setUp() throws Exception {
118    fs = cluster.getFileSystem();
119    dir = new Path(hbaseDir, currentTest.getMethodName());
120    this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
121    wals = new WALFactory(conf, this.currentServername.toString());
122  }
123
124  @After
125  public void tearDown() throws Exception {
126    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
127    try {
128      wals.close();
129    } catch (IOException exception) {
130      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this"
131        + " may be the cause. Message: " + exception);
132      LOG.debug("Exception details for failure to close wal factory.", exception);
133    }
134    FileStatus[] entries = fs.listStatus(new Path("/"));
135    for (FileStatus dir : entries) {
136      fs.delete(dir.getPath(), true);
137    }
138  }
139
140  @BeforeClass
141  public static void setUpBeforeClass() throws Exception {
142    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
143    // Make block sizes small.
144    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
145    // needed for testAppendClose()
146    // quicker heartbeat interval for faster DN death notification
147    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
148    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
149    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
150
151    // faster failover with cluster.shutdown();fs.close() idiom
152    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
153    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
154    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
155    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
156    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
157    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
158      SampleRegionWALCoprocessor.class.getName());
159    TEST_UTIL.startMiniDFSCluster(3);
160
161    conf = TEST_UTIL.getConfiguration();
162    cluster = TEST_UTIL.getDFSCluster();
163
164    hbaseDir = TEST_UTIL.createRootDir();
165    hbaseWALDir = TEST_UTIL.createWALRootDir();
166  }
167
168  @AfterClass
169  public static void tearDownAfterClass() throws Exception {
170    TEST_UTIL.shutdownMiniCluster();
171  }
172
173  @Test
174  public void canCloseSingleton() throws IOException {
175    WALFactory.getInstance(conf).close();
176  }
177
178  /**
179   * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail.
180   */
181  @Test
182  public void testSplit() throws IOException {
183    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
184    final byte[] rowName = tableName.getName();
185    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
186    final int howmany = 3;
187    RegionInfo[] infos = new RegionInfo[3];
188    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
189    fs.mkdirs(tableDataDir);
190    Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
191    fs.mkdirs(tabledir);
192    for (int i = 0; i < howmany; i++) {
193      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
194        .setEndKey(Bytes.toBytes("" + (i + 1))).build();
195      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
196      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
197      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
198    }
199    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
200    scopes.put(Bytes.toBytes("column"), 0);
201
202    // Add edits for three regions.
203    for (int ii = 0; ii < howmany; ii++) {
204      for (int i = 0; i < howmany; i++) {
205        final WAL log = wals.getWAL(infos[i]);
206        for (int j = 0; j < howmany; j++) {
207          WALEdit edit = new WALEdit();
208          byte[] family = Bytes.toBytes("column");
209          byte[] qualifier = Bytes.toBytes(Integer.toString(j));
210          byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
211          edit.add(
212            new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column));
213          LOG.info("Region " + i + ": " + edit);
214          WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
215            EnvironmentEdgeManager.currentTime(), mvcc, scopes);
216          log.appendData(infos[i], walKey, edit);
217          walKey.getWriteEntry();
218        }
219        log.sync();
220        log.rollWriter(true);
221      }
222    }
223    wals.shutdown();
224    // The below calculation of logDir relies on insider information... WALSplitter should be
225    // connected better
226    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
227    Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
228      this.currentServername.toString());
229    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
230    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
231    verifySplits(splits, howmany);
232  }
233
234  /**
235   * Test new HDFS-265 sync.
236   */
237  @Test
238  public void Broken_testSync() throws Exception {
239    TableName tableName = TableName.valueOf(currentTest.getMethodName());
240    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
241    // First verify that using streams all works.
242    Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
243    FSDataOutputStream out = fs.create(p);
244    out.write(tableName.getName());
245    Method syncMethod = null;
246    try {
247      syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
248    } catch (NoSuchMethodException e) {
249      try {
250        syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
251      } catch (NoSuchMethodException ex) {
252        fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush().");
253      }
254    }
255    syncMethod.invoke(out, new Object[] {});
256    FSDataInputStream in = fs.open(p);
257    assertTrue(in.available() > 0);
258    byte[] buffer = new byte[1024];
259    int read = in.read(buffer);
260    assertEquals(tableName.getName().length, read);
261    out.close();
262    in.close();
263
264    final int total = 20;
265    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
266    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
267    scopes.put(tableName.getName(), 0);
268    final WAL wal = wals.getWAL(info);
269
270    for (int i = 0; i < total; i++) {
271      WALEdit kvs = new WALEdit();
272      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
273      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
274        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
275    }
276    // Now call sync and try reading. Opening a Reader before you sync just
277    // gives you EOFE.
278    wal.sync();
279    // Open a Reader.
280    Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
281    int count = NoEOFWALStreamReader.count(wals, fs, walPath);
282    assertEquals(total, count);
283    // Add test that checks to see that an open of a Reader works on a file
284    // that has had a sync done on it.
285    for (int i = 0; i < total; i++) {
286      WALEdit kvs = new WALEdit();
287      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
288      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
289        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
290    }
291    wal.sync();
292    count = NoEOFWALStreamReader.count(wals, fs, walPath);
293    assertTrue(count >= total);
294    // If I sync, should see double the edits.
295    wal.sync();
296    count = NoEOFWALStreamReader.count(wals, fs, walPath);
297    assertEquals(total * 2, count);
298    // Now do a test that ensures stuff works when we go over block boundary,
299    // especially that we return good length on file.
300    final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
301    for (int i = 0; i < total; i++) {
302      WALEdit kvs = new WALEdit();
303      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
304      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
305        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
306    }
307    // Now I should have written out lots of blocks. Sync then read.
308    wal.sync();
309    count = NoEOFWALStreamReader.count(wals, fs, walPath);
310    assertEquals(total * 3, count);
311    // shutdown and ensure that Reader gets right length also.
312    wal.shutdown();
313    count = NoEOFWALStreamReader.count(wals, fs, walPath);
314    assertEquals(total * 3, count);
315  }
316
317  private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
318    assertEquals(howmany * howmany, splits.size());
319    for (int i = 0; i < splits.size(); i++) {
320      LOG.info("Verifying=" + splits.get(i));
321      try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) {
322        int count = 0;
323        String previousRegion = null;
324        long seqno = -1;
325        WAL.Entry entry = new WAL.Entry();
326        while ((entry = reader.next(entry)) != null) {
327          WALKey key = entry.getKey();
328          String region = Bytes.toString(key.getEncodedRegionName());
329          // Assert that all edits are for same region.
330          if (previousRegion != null) {
331            assertEquals(previousRegion, region);
332          }
333          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
334          assertTrue(seqno < key.getSequenceId());
335          seqno = key.getSequenceId();
336          previousRegion = region;
337          count++;
338        }
339        assertEquals(howmany, count);
340      }
341    }
342  }
343
344  /*
345   * We pass different values to recoverFileLease() so that different code paths are covered For
346   * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze
347   * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain
348   * pendingCreates)
349   */
350  @Test
351  public void testAppendClose() throws Exception {
352    TableName tableName = TableName.valueOf(currentTest.getMethodName());
353    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
354
355    WAL wal = wals.getWAL(regionInfo);
356    int total = 20;
357
358    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
359    scopes.put(tableName.getName(), 0);
360    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
361    for (int i = 0; i < total; i++) {
362      WALEdit kvs = new WALEdit();
363      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
364      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
365        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
366    }
367    // Now call sync to send the data to HDFS datanodes
368    wal.sync();
369    int namenodePort = cluster.getNameNodePort();
370    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
371
372    // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
373    try {
374      DistributedFileSystem dfs = cluster.getFileSystem();
375      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
376      TEST_UTIL.shutdownMiniDFSCluster();
377      try {
378        // wal.writer.close() will throw an exception,
379        // but still call this since it closes the LogSyncer thread first
380        wal.shutdown();
381      } catch (IOException e) {
382        LOG.info(e.toString(), e);
383      }
384      fs.close(); // closing FS last so DFSOutputStream can't call close
385      LOG.info("STOPPED first instance of the cluster");
386    } finally {
387      // Restart the cluster
388      while (cluster.isClusterUp()) {
389        LOG.error("Waiting for cluster to go down");
390        Thread.sleep(1000);
391      }
392      assertFalse(cluster.isClusterUp());
393      cluster = null;
394      for (int i = 0; i < 100; i++) {
395        try {
396          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
397          break;
398        } catch (BindException e) {
399          LOG.info("Sleeping.  BindException bringing up new cluster");
400          Threads.sleep(1000);
401        }
402      }
403      cluster.waitActive();
404      fs = cluster.getFileSystem();
405      LOG.info("STARTED second instance.");
406    }
407
408    // set the lease period to be 1 second so that the
409    // namenode triggers lease recovery upon append request
410    Method setLeasePeriod =
411      cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE });
412    setLeasePeriod.setAccessible(true);
413    setLeasePeriod.invoke(cluster, 1000L, 1000L);
414    try {
415      Thread.sleep(1000);
416    } catch (InterruptedException e) {
417      LOG.info(e.toString(), e);
418    }
419
420    // Now try recovering the log, like the HMaster would do
421    final FileSystem recoveredFs = fs;
422    final Configuration rlConf = conf;
423
424    class RecoverLogThread extends Thread {
425      public Exception exception = null;
426
427      @Override
428      public void run() {
429        try {
430          RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
431        } catch (IOException e) {
432          exception = e;
433        }
434      }
435    }
436
437    RecoverLogThread t = new RecoverLogThread();
438    t.start();
439    // Timeout after 60 sec. Without correct patches, would be an infinite loop
440    t.join(60 * 1000);
441    if (t.isAlive()) {
442      t.interrupt();
443      throw new Exception("Timed out waiting for WAL.recoverLog()");
444    }
445
446    if (t.exception != null) throw t.exception;
447
448    // Make sure you can read all the content
449    int count = 0;
450    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) {
451      WAL.Entry entry = new WAL.Entry();
452      while (reader.next(entry) != null) {
453        count++;
454        assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
455      }
456    }
457    assertEquals(total, count);
458
459    // Reset the lease period
460    setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
461  }
462
463  /**
464   * Tests that we can write out an edit, close, and then read it back in again.
465   */
466  @Test
467  public void testEditAdd() throws IOException {
468    int colCount = 10;
469    TableDescriptor htd =
470      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
471        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
472    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
473    for (byte[] fam : htd.getColumnFamilyNames()) {
474      scopes.put(fam, 0);
475    }
476    byte[] row = Bytes.toBytes("row");
477    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
478
479    // Write columns named 1, 2, 3, etc. and then values of single byte
480    // 1, 2, 3...
481    long timestamp = EnvironmentEdgeManager.currentTime();
482    WALEdit cols = new WALEdit();
483    for (int i = 0; i < colCount; i++) {
484      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
485        timestamp, new byte[] { (byte) (i + '0') }));
486    }
487    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
488      .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
489    final WAL log = wals.getWAL(info);
490
491    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
492      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
493    log.sync(txid);
494    log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
495    log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
496    log.shutdown();
497    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
498    // Now open a reader on the log and assert append worked.
499    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) {
500      // Above we added all columns on a single row so we only read one
501      // entry in the below... thats why we have '1'.
502      for (int i = 0; i < 1; i++) {
503        WAL.Entry entry = reader.next(null);
504        if (entry == null) break;
505        WALKey key = entry.getKey();
506        WALEdit val = entry.getEdit();
507        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
508        assertTrue(htd.getTableName().equals(key.getTableName()));
509        Cell cell = val.getCells().get(0);
510        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
511          cell.getRowLength()));
512        assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
513        LOG.info(key + " " + val);
514      }
515    }
516  }
517
518  @Test
519  public void testAppend() throws IOException {
520    int colCount = 10;
521    TableDescriptor htd =
522      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
523        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
524    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
525    for (byte[] fam : htd.getColumnFamilyNames()) {
526      scopes.put(fam, 0);
527    }
528    byte[] row = Bytes.toBytes("row");
529    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
530    // Write columns named 1, 2, 3, etc. and then values of single byte
531    // 1, 2, 3...
532    long timestamp = EnvironmentEdgeManager.currentTime();
533    WALEdit cols = new WALEdit();
534    for (int i = 0; i < colCount; i++) {
535      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
536        timestamp, new byte[] { (byte) (i + '0') }));
537    }
538    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
539    final WAL log = wals.getWAL(hri);
540    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
541      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
542    log.sync(txid);
543    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
544    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
545    log.shutdown();
546    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
547    // Now open a reader on the log and assert append worked.
548    try (WALStreamReader reader = wals.createStreamReader(fs, filename)) {
549      WAL.Entry entry = reader.next();
550      assertEquals(colCount, entry.getEdit().size());
551      int idx = 0;
552      for (Cell val : entry.getEdit().getCells()) {
553        assertTrue(
554          Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()));
555        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
556        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
557          val.getRowLength()));
558        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
559        System.out.println(entry.getKey() + " " + val);
560        idx++;
561      }
562    }
563  }
564
565  /**
566   * Test that we can visit entries before they are appended
567   */
568  @Test
569  public void testVisitors() throws Exception {
570    final int COL_COUNT = 10;
571    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
572    final byte[] row = Bytes.toBytes("row");
573    final DumbWALActionsListener visitor = new DumbWALActionsListener();
574    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
575    long timestamp = EnvironmentEdgeManager.currentTime();
576    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
577    scopes.put(Bytes.toBytes("column"), 0);
578
579    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
580    final WAL log = wals.getWAL(hri);
581    log.registerWALActionsListener(visitor);
582    for (int i = 0; i < COL_COUNT; i++) {
583      WALEdit cols = new WALEdit();
584      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
585        timestamp, new byte[] { (byte) (i + '0') }));
586      log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
587        EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
588    }
589    log.sync();
590    assertEquals(COL_COUNT, visitor.increments);
591    log.unregisterWALActionsListener(visitor);
592    WALEdit cols = new WALEdit();
593    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)),
594      timestamp, new byte[] { (byte) (11 + '0') }));
595    log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
596      EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
597    log.sync();
598    assertEquals(COL_COUNT, visitor.increments);
599  }
600
601  /**
602   * A loaded WAL coprocessor won't break existing WAL test cases.
603   */
604  @Test
605  public void testWALCoprocessorLoaded() throws Exception {
606    // test to see whether the coprocessor is loaded or not.
607    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
608    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
609    assertNotNull(c);
610  }
611
612  static class DumbWALActionsListener implements WALActionsListener {
613    int increments = 0;
614
615    @Override
616    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
617      increments++;
618    }
619  }
620
621  @Test
622  public void testWALProviders() throws IOException {
623    Configuration conf = new Configuration();
624    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
625    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
626  }
627
628  @Test
629  public void testOnlySetWALProvider() throws IOException {
630    Configuration conf = new Configuration();
631    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
632    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
633
634    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass());
635    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
636  }
637
638  @Test
639  public void testOnlySetMetaWALProvider() throws IOException {
640    Configuration conf = new Configuration();
641    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
642    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
643
644    assertEquals(WALFactory.Providers.defaultProvider.clazz,
645      walFactory.getWALProvider().getClass());
646    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
647  }
648
649  @Test
650  public void testDefaultProvider() throws IOException {
651    final Configuration conf = new Configuration();
652    // AsyncFSWal is the default, we should be able to request any WAL.
653    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString());
654    Class<? extends WALProvider> fshLogProvider =
655      normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
656    assertEquals(Providers.filesystem.clazz, fshLogProvider);
657
658    // Imagine a world where MultiWAL is the default
659    final WALFactory customizedWalFactory =
660      new WALFactory(conf, this.currentServername.toString()) {
661        @Override
662        Providers getDefaultProvider() {
663          return Providers.multiwal;
664        }
665      };
666    // If we don't specify a WALProvider, we should get the default implementation.
667    Class<? extends WALProvider> multiwalProviderClass =
668      customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name());
669    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
670  }
671
672  @Test
673  public void testCustomProvider() throws IOException {
674    final Configuration config = new Configuration();
675    config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
676    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
677    Class<? extends WALProvider> walProvider =
678      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
679    assertEquals(IOTestProvider.class, walProvider);
680    WALProvider metaWALProvider = walFactory.getMetaProvider();
681    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
682  }
683
684  @Test
685  public void testCustomMetaProvider() throws IOException {
686    final Configuration config = new Configuration();
687    config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
688    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
689    Class<? extends WALProvider> walProvider =
690      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
691    assertEquals(Providers.filesystem.clazz, walProvider);
692    WALProvider metaWALProvider = walFactory.getMetaProvider();
693    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
694  }
695
696  @Test
697  public void testReaderClosedOnBadCodec() throws IOException {
698    // Create our own Configuration and WALFactory to avoid breaking other test methods
699    Configuration confWithCodec = new Configuration(conf);
700    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class,
701      Codec.class);
702    WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
703
704    // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
705    // the FileSystem and know if close() was called on those InputStreams.
706    List<InputStreamProxy> openedReaders = new ArrayList<>();
707    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
708      @Override
709      public FSDataInputStream open(Path p) throws IOException {
710        InputStreamProxy is = new InputStreamProxy(super.open(p));
711        openedReaders.add(is);
712        return is;
713      }
714
715      @Override
716      public FSDataInputStream open(Path p, int blockSize) throws IOException {
717        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
718        openedReaders.add(is);
719        return is;
720      }
721    };
722
723    final TableDescriptor htd =
724      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
725        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
726    final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
727
728    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
729    for (byte[] fam : htd.getColumnFamilyNames()) {
730      scopes.put(fam, 0);
731    }
732    byte[] row = Bytes.toBytes("row");
733    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
734    // Write one column in one edit.
735    WALEdit cols = new WALEdit();
736    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
737      EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
738    final WAL log = customFactory.getWAL(hri);
739    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
740      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
741    // Sync the edit to the WAL
742    log.sync(txid);
743    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
744    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
745    log.shutdown();
746
747    // Inject our failure, object is constructed via reflection.
748    BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
749
750    // Now open a reader on the log which will throw an exception when
751    // we try to instantiate the custom Codec.
752    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
753    assertThrows("Expected to see an exception when creating WAL reader", IOException.class,
754      () -> customFactory.createStreamReader(proxyFs, filename));
755    // We should have exactly one reader
756    assertEquals(1, openedReaders.size());
757    // And that reader should be closed.
758    long unclosedReaders =
759      openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
760    assertEquals("Should not find any open readers", 0, unclosedReaders);
761  }
762
763  /**
764   * A proxy around FSDataInputStream which can report if close() was called.
765   */
766  private static class InputStreamProxy extends FSDataInputStream {
767    private final InputStream real;
768    private final AtomicBoolean isClosed = new AtomicBoolean(false);
769
770    public InputStreamProxy(InputStream real) {
771      super(real);
772      this.real = real;
773    }
774
775    @Override
776    public void close() throws IOException {
777      isClosed.set(true);
778      real.close();
779    }
780  }
781
782  /**
783   * A custom WALCellCodec in which we can inject failure.
784   */
785  @SuppressWarnings("unused")
786  private static class BrokenWALCellCodec extends WALCellCodec {
787    static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
788
789    static void maybeInjectFailure() {
790      if (THROW_FAILURE_ON_INIT.get()) {
791        throw new RuntimeException("Injected instantiation exception");
792      }
793    }
794
795    public BrokenWALCellCodec() {
796      super();
797      maybeInjectFailure();
798    }
799
800    public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
801      super(conf, compression);
802      maybeInjectFailure();
803    }
804  }
805}