001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
044import org.apache.hadoop.hbase.testclassification.MapReduceTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
054import org.apache.hadoop.hbase.wal.WALFactory;
055import org.apache.hadoop.hbase.wal.WALKey;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.mapreduce.InputSplit;
058import org.apache.hadoop.mapreduce.MapReduceTestUtil;
059import org.junit.AfterClass;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * JUnit tests for the WALRecordReader
070 */
071@Category({ MapReduceTests.class, MediumTests.class })
072public class TestWALRecordReader {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestWALRecordReader.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class);
079  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
080  private static Configuration conf;
081  private static FileSystem fs;
082  private static Path hbaseDir;
083  private static FileSystem walFs;
084  private static Path walRootDir;
085  // visible for TestHLogRecordReader
086  static final TableName tableName = TableName.valueOf(getName());
087  private static final byte[] rowName = tableName.getName();
088  // visible for TestHLogRecordReader
089  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
090  private static final byte[] family = Bytes.toBytes("column");
091  private static final byte[] value = Bytes.toBytes("value");
092  private static Path logDir;
093  protected MultiVersionConcurrencyControl mvcc;
094  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
095
096  private static String getName() {
097    return "TestWALRecordReader";
098  }
099
100  private static String getServerName() {
101    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
102    return serverName.toString();
103  }
104
105  @Before
106  public void setUp() throws Exception {
107    fs.delete(hbaseDir, true);
108    walFs.delete(walRootDir, true);
109    mvcc = new MultiVersionConcurrencyControl();
110  }
111
112  @BeforeClass
113  public static void setUpBeforeClass() throws Exception {
114    // Make block sizes small.
115    conf = TEST_UTIL.getConfiguration();
116    conf.setInt("dfs.blocksize", 1024 * 1024);
117    conf.setInt("dfs.replication", 1);
118    TEST_UTIL.startMiniDFSCluster(1);
119
120    conf = TEST_UTIL.getConfiguration();
121    fs = TEST_UTIL.getDFSCluster().getFileSystem();
122
123    hbaseDir = TEST_UTIL.createRootDir();
124    walRootDir = TEST_UTIL.createWALRootDir();
125    walFs = CommonFSUtils.getWALFileSystem(conf);
126    logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
127  }
128
129  @AfterClass
130  public static void tearDownAfterClass() throws Exception {
131    fs.delete(hbaseDir, true);
132    walFs.delete(walRootDir, true);
133    TEST_UTIL.shutdownMiniCluster();
134  }
135
136  /**
137   * Test partial reads from the WALs based on passed time range.
138   */
139  @Test
140  public void testPartialRead() throws Exception {
141    final WALFactory walfactory = new WALFactory(conf, getName());
142    WAL log = walfactory.getWAL(info);
143    // This test depends on timestamp being millisecond based and the filename of the WAL also
144    // being millisecond based.
145    long ts = EnvironmentEdgeManager.currentTime();
146    WALEdit edit = new WALEdit();
147    WALEditInternalHelper.addExtendedCell(edit,
148      new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
149    log.appendData(info, getWalKeyImpl(ts, scopes), edit);
150    edit = new WALEdit();
151    WALEditInternalHelper.addExtendedCell(edit,
152      new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value));
153    log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit);
154    log.sync();
155    Threads.sleep(10);
156    LOG.info("Before 1st WAL roll " + log.toString());
157    log.rollWriter();
158    LOG.info("Past 1st WAL roll " + log.toString());
159
160    Thread.sleep(1);
161    long ts1 = EnvironmentEdgeManager.currentTime();
162
163    edit = new WALEdit();
164    WALEditInternalHelper.addExtendedCell(edit,
165      new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value));
166    log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit);
167    edit = new WALEdit();
168    WALEditInternalHelper.addExtendedCell(edit,
169      new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value));
170    log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit);
171    log.sync();
172    log.shutdown();
173    walfactory.shutdown();
174    LOG.info("Closed WAL " + log.toString());
175
176    WALInputFormat input = new WALInputFormat();
177    Configuration jobConf = new Configuration(conf);
178    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
179    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
180
181    // Only 1st file is considered, and only its 1st entry is in-range.
182    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
183    assertEquals(1, splits.size());
184    testSplit(splits.get(0), Bytes.toBytes("1"));
185
186    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
187    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
188    assertEquals(2, splits.size());
189    // Both entries from first file are in-range.
190    testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
191    // Only the 1st entry from the 2nd file is in-range.
192    testSplit(splits.get(1), Bytes.toBytes("3"));
193
194    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
195    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
196    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
197    assertEquals(1, splits.size());
198    // Only the 1st entry from the 2nd file is in-range.
199    testSplit(splits.get(0), Bytes.toBytes("3"));
200  }
201
202  /**
203   * Test basic functionality
204   */
205  @Test
206  public void testWALRecordReader() throws Exception {
207    final WALFactory walfactory = new WALFactory(conf, getName());
208    WAL log = walfactory.getWAL(info);
209    byte[] value = Bytes.toBytes("value");
210    WALEdit edit = new WALEdit();
211    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
212      EnvironmentEdgeManager.currentTime(), value));
213    long txid =
214      log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
215    log.sync(txid);
216
217    Thread.sleep(1); // make sure 2nd log gets a later timestamp
218    long secondTs = EnvironmentEdgeManager.currentTime();
219    log.rollWriter();
220
221    edit = new WALEdit();
222    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
223      EnvironmentEdgeManager.currentTime(), value));
224    txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
225    log.sync(txid);
226    log.shutdown();
227    walfactory.shutdown();
228    long thirdTs = EnvironmentEdgeManager.currentTime();
229
230    // should have 2 log files now
231    WALInputFormat input = new WALInputFormat();
232    Configuration jobConf = new Configuration(conf);
233    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
234
235    // make sure both logs are found
236    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
237    assertEquals(2, splits.size());
238
239    // should return exactly one KV
240    testSplit(splits.get(0), Bytes.toBytes("1"));
241    // same for the 2nd split
242    testSplit(splits.get(1), Bytes.toBytes("2"));
243
244    // now test basic time ranges:
245
246    // set an endtime, the 2nd log file can be ignored completely.
247    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1);
248    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
249    assertEquals(1, splits.size());
250    testSplit(splits.get(0), Bytes.toBytes("1"));
251
252    // now set a start time
253    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
254    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
255    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
256    assertTrue(splits.isEmpty());
257  }
258
259  /**
260   * Test WALRecordReader tolerance to moving WAL from active to archive directory
261   * @throws Exception exception
262   */
263  @Test
264  public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
265    final WALFactory walfactory = new WALFactory(conf, getName());
266    WAL log = walfactory.getWAL(info);
267    byte[] value = Bytes.toBytes("value");
268    WALEdit edit = new WALEdit();
269    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"),
270      EnvironmentEdgeManager.currentTime(), value));
271    long txid =
272      log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
273    log.sync(txid);
274
275    Thread.sleep(10); // make sure 2nd edit gets a later timestamp
276
277    edit = new WALEdit();
278    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"),
279      EnvironmentEdgeManager.currentTime(), value));
280    txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit);
281    log.sync(txid);
282    log.shutdown();
283
284    // should have 2 log entries now
285    WALInputFormat input = new WALInputFormat();
286    Configuration jobConf = new Configuration(conf);
287    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
288    // make sure log is found
289    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
290    assertEquals(1, splits.size());
291    WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0);
292    LOG.debug("log=" + logDir + " file=" + split.getLogFileName());
293
294    testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
295  }
296
297  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
298    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
299  }
300
301  private WALRecordReader<WALKey> getReader() {
302    return new WALKeyRecordReader();
303  }
304
305  /**
306   * Create a new reader from the split, and match the edits against the passed columns.
307   */
308  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
309    WALRecordReader<WALKey> reader = getReader();
310    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
311
312    for (byte[] column : columns) {
313      assertTrue(reader.nextKeyValue());
314      Cell cell = reader.getCurrentValue().getCells().get(0);
315      if (
316        !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(),
317          cell.getQualifierLength())
318      ) {
319        assertTrue(
320          "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(
321            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
322          false);
323      }
324    }
325    assertFalse(reader.nextKeyValue());
326    reader.close();
327  }
328
329  /**
330   * Create a new reader from the split, match the edits against the passed columns, moving WAL to
331   * archive in between readings
332   */
333  private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
334    WALRecordReader<WALKey> reader = getReader();
335    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
336
337    assertTrue(reader.nextKeyValue());
338    Cell cell = reader.getCurrentValue().getCells().get(0);
339    if (
340      !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
341        cell.getQualifierLength())
342    ) {
343      assertTrue(
344        "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
345          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
346        false);
347    }
348    // Move log file to archive directory
349    // While WAL record reader is open
350    WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
351    Path logFile = new Path(split_.getLogFileName());
352    Path archivedLogDir = getWALArchiveDir(conf);
353    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
354    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
355
356    assertTrue(fs.rename(logFile, archivedLogLocation));
357    assertTrue(fs.exists(archivedLogDir));
358    assertFalse(fs.exists(logFile));
359    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
360    // TODO: the archivedLogLocation to read next key value.
361    assertTrue(reader.nextKeyValue());
362    cell = reader.getCurrentValue().getCells().get(0);
363    if (
364      !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
365        cell.getQualifierLength())
366    ) {
367      assertTrue(
368        "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
369          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
370        false);
371    }
372    reader.close();
373  }
374
375  private Path getWALArchiveDir(Configuration conf) throws IOException {
376    Path rootDir = CommonFSUtils.getWALRootDir(conf);
377    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
378    return new Path(rootDir, archiveDir);
379  }
380}