001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; 042import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; 043import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 044import org.apache.hadoop.hbase.testclassification.MapReduceTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 054import org.apache.hadoop.hbase.wal.WALFactory; 055import org.apache.hadoop.hbase.wal.WALKey; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.mapreduce.InputSplit; 058import org.apache.hadoop.mapreduce.MapReduceTestUtil; 059import org.junit.AfterClass; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * JUnit tests for the WALRecordReader 070 */ 071@Category({ MapReduceTests.class, MediumTests.class }) 072public class TestWALRecordReader { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestWALRecordReader.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestWALRecordReader.class); 079 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 private static Configuration conf; 081 private static FileSystem fs; 082 private static Path hbaseDir; 083 private static FileSystem walFs; 084 private static Path walRootDir; 085 // visible for TestHLogRecordReader 086 static final TableName tableName = TableName.valueOf(getName()); 087 private static final byte[] rowName = tableName.getName(); 088 // visible for TestHLogRecordReader 089 static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 090 private static final byte[] family = Bytes.toBytes("column"); 091 private static final byte[] value = Bytes.toBytes("value"); 092 private static Path logDir; 093 protected MultiVersionConcurrencyControl mvcc; 094 protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 095 096 private static String getName() { 097 return "TestWALRecordReader"; 098 } 099 100 private static String getServerName() { 101 ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1); 102 return serverName.toString(); 103 } 104 105 @Before 106 public void setUp() throws Exception { 107 fs.delete(hbaseDir, true); 108 walFs.delete(walRootDir, true); 109 mvcc = new MultiVersionConcurrencyControl(); 110 } 111 112 @BeforeClass 113 public static void setUpBeforeClass() throws Exception { 114 // Make block sizes small. 115 conf = TEST_UTIL.getConfiguration(); 116 conf.setInt("dfs.blocksize", 1024 * 1024); 117 conf.setInt("dfs.replication", 1); 118 TEST_UTIL.startMiniDFSCluster(1); 119 120 conf = TEST_UTIL.getConfiguration(); 121 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 122 123 hbaseDir = TEST_UTIL.createRootDir(); 124 walRootDir = TEST_UTIL.createWALRootDir(); 125 walFs = CommonFSUtils.getWALFileSystem(conf); 126 logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 127 } 128 129 @AfterClass 130 public static void tearDownAfterClass() throws Exception { 131 fs.delete(hbaseDir, true); 132 walFs.delete(walRootDir, true); 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 /** 137 * Test partial reads from the WALs based on passed time range. 138 */ 139 @Test 140 public void testPartialRead() throws Exception { 141 final WALFactory walfactory = new WALFactory(conf, getName()); 142 WAL log = walfactory.getWAL(info); 143 // This test depends on timestamp being millisecond based and the filename of the WAL also 144 // being millisecond based. 145 long ts = EnvironmentEdgeManager.currentTime(); 146 WALEdit edit = new WALEdit(); 147 WALEditInternalHelper.addExtendedCell(edit, 148 new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); 149 log.appendData(info, getWalKeyImpl(ts, scopes), edit); 150 edit = new WALEdit(); 151 WALEditInternalHelper.addExtendedCell(edit, 152 new KeyValue(rowName, family, Bytes.toBytes("2"), ts + 1, value)); 153 log.appendData(info, getWalKeyImpl(ts + 1, scopes), edit); 154 log.sync(); 155 Threads.sleep(10); 156 LOG.info("Before 1st WAL roll " + log.toString()); 157 log.rollWriter(); 158 LOG.info("Past 1st WAL roll " + log.toString()); 159 160 Thread.sleep(1); 161 long ts1 = EnvironmentEdgeManager.currentTime(); 162 163 edit = new WALEdit(); 164 WALEditInternalHelper.addExtendedCell(edit, 165 new KeyValue(rowName, family, Bytes.toBytes("3"), ts1 + 1, value)); 166 log.appendData(info, getWalKeyImpl(ts1 + 1, scopes), edit); 167 edit = new WALEdit(); 168 WALEditInternalHelper.addExtendedCell(edit, 169 new KeyValue(rowName, family, Bytes.toBytes("4"), ts1 + 2, value)); 170 log.appendData(info, getWalKeyImpl(ts1 + 2, scopes), edit); 171 log.sync(); 172 log.shutdown(); 173 walfactory.shutdown(); 174 LOG.info("Closed WAL " + log.toString()); 175 176 WALInputFormat input = new WALInputFormat(); 177 Configuration jobConf = new Configuration(conf); 178 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 179 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); 180 181 // Only 1st file is considered, and only its 1st entry is in-range. 182 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 183 assertEquals(1, splits.size()); 184 testSplit(splits.get(0), Bytes.toBytes("1")); 185 186 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 187 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 188 assertEquals(2, splits.size()); 189 // Both entries from first file are in-range. 190 testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 191 // Only the 1st entry from the 2nd file is in-range. 192 testSplit(splits.get(1), Bytes.toBytes("3")); 193 194 jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1); 195 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1); 196 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 197 assertEquals(1, splits.size()); 198 // Only the 1st entry from the 2nd file is in-range. 199 testSplit(splits.get(0), Bytes.toBytes("3")); 200 } 201 202 /** 203 * Test basic functionality 204 */ 205 @Test 206 public void testWALRecordReader() throws Exception { 207 final WALFactory walfactory = new WALFactory(conf, getName()); 208 WAL log = walfactory.getWAL(info); 209 byte[] value = Bytes.toBytes("value"); 210 WALEdit edit = new WALEdit(); 211 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"), 212 EnvironmentEdgeManager.currentTime(), value)); 213 long txid = 214 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 215 log.sync(txid); 216 217 Thread.sleep(1); // make sure 2nd log gets a later timestamp 218 long secondTs = EnvironmentEdgeManager.currentTime(); 219 log.rollWriter(); 220 221 edit = new WALEdit(); 222 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"), 223 EnvironmentEdgeManager.currentTime(), value)); 224 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 225 log.sync(txid); 226 log.shutdown(); 227 walfactory.shutdown(); 228 long thirdTs = EnvironmentEdgeManager.currentTime(); 229 230 // should have 2 log files now 231 WALInputFormat input = new WALInputFormat(); 232 Configuration jobConf = new Configuration(conf); 233 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 234 235 // make sure both logs are found 236 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 237 assertEquals(2, splits.size()); 238 239 // should return exactly one KV 240 testSplit(splits.get(0), Bytes.toBytes("1")); 241 // same for the 2nd split 242 testSplit(splits.get(1), Bytes.toBytes("2")); 243 244 // now test basic time ranges: 245 246 // set an endtime, the 2nd log file can be ignored completely. 247 jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs - 1); 248 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 249 assertEquals(1, splits.size()); 250 testSplit(splits.get(0), Bytes.toBytes("1")); 251 252 // now set a start time 253 jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); 254 jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); 255 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 256 assertTrue(splits.isEmpty()); 257 } 258 259 /** 260 * Test WALRecordReader tolerance to moving WAL from active to archive directory 261 * @throws Exception exception 262 */ 263 @Test 264 public void testWALRecordReaderActiveArchiveTolerance() throws Exception { 265 final WALFactory walfactory = new WALFactory(conf, getName()); 266 WAL log = walfactory.getWAL(info); 267 byte[] value = Bytes.toBytes("value"); 268 WALEdit edit = new WALEdit(); 269 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("1"), 270 EnvironmentEdgeManager.currentTime(), value)); 271 long txid = 272 log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 273 log.sync(txid); 274 275 Thread.sleep(10); // make sure 2nd edit gets a later timestamp 276 277 edit = new WALEdit(); 278 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(rowName, family, Bytes.toBytes("2"), 279 EnvironmentEdgeManager.currentTime(), value)); 280 txid = log.appendData(info, getWalKeyImpl(EnvironmentEdgeManager.currentTime(), scopes), edit); 281 log.sync(txid); 282 log.shutdown(); 283 284 // should have 2 log entries now 285 WALInputFormat input = new WALInputFormat(); 286 Configuration jobConf = new Configuration(conf); 287 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); 288 // make sure log is found 289 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); 290 assertEquals(1, splits.size()); 291 WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); 292 LOG.debug("log=" + logDir + " file=" + split.getLogFileName()); 293 294 testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2")); 295 } 296 297 protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) { 298 return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); 299 } 300 301 private WALRecordReader<WALKey> getReader() { 302 return new WALKeyRecordReader(); 303 } 304 305 /** 306 * Create a new reader from the split, and match the edits against the passed columns. 307 */ 308 private void testSplit(InputSplit split, byte[]... columns) throws Exception { 309 WALRecordReader<WALKey> reader = getReader(); 310 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 311 312 for (byte[] column : columns) { 313 assertTrue(reader.nextKeyValue()); 314 Cell cell = reader.getCurrentValue().getCells().get(0); 315 if ( 316 !Bytes.equals(column, 0, column.length, cell.getQualifierArray(), cell.getQualifierOffset(), 317 cell.getQualifierLength()) 318 ) { 319 assertTrue( 320 "expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString( 321 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 322 false); 323 } 324 } 325 assertFalse(reader.nextKeyValue()); 326 reader.close(); 327 } 328 329 /** 330 * Create a new reader from the split, match the edits against the passed columns, moving WAL to 331 * archive in between readings 332 */ 333 private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception { 334 WALRecordReader<WALKey> reader = getReader(); 335 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); 336 337 assertTrue(reader.nextKeyValue()); 338 Cell cell = reader.getCurrentValue().getCells().get(0); 339 if ( 340 !Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(), 341 cell.getQualifierLength()) 342 ) { 343 assertTrue( 344 "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString( 345 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 346 false); 347 } 348 // Move log file to archive directory 349 // While WAL record reader is open 350 WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split; 351 Path logFile = new Path(split_.getLogFileName()); 352 Path archivedLogDir = getWALArchiveDir(conf); 353 Path archivedLogLocation = new Path(archivedLogDir, logFile.getName()); 354 assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString()); 355 356 assertTrue(fs.rename(logFile, archivedLogLocation)); 357 assertTrue(fs.exists(archivedLogDir)); 358 assertFalse(fs.exists(logFile)); 359 // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open 360 // TODO: the archivedLogLocation to read next key value. 361 assertTrue(reader.nextKeyValue()); 362 cell = reader.getCurrentValue().getCells().get(0); 363 if ( 364 !Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(), 365 cell.getQualifierLength()) 366 ) { 367 assertTrue( 368 "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString( 369 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]", 370 false); 371 } 372 reader.close(); 373 } 374 375 private Path getWALArchiveDir(Configuration conf) throws IOException { 376 Path rootDir = CommonFSUtils.getWALRootDir(conf); 377 String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName()); 378 return new Path(rootDir, archiveDir); 379 } 380}