001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertThrows; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.io.InputStream; 031import java.lang.reflect.Method; 032import java.net.BindException; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.NavigableMap; 036import java.util.TreeMap; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.stream.Collectors; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataInputStream; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.Coprocessor; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.client.RegionInfoBuilder; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.codec.Codec; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 063import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 065import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 066import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.RegionServerTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 073import org.apache.hadoop.hbase.util.Threads; 074import org.apache.hadoop.hbase.wal.WALFactory.Providers; 075import org.apache.hadoop.hdfs.DistributedFileSystem; 076import org.apache.hadoop.hdfs.MiniDFSCluster; 077import org.apache.hadoop.hdfs.protocol.HdfsConstants; 078import org.junit.After; 079import org.junit.AfterClass; 080import org.junit.Before; 081import org.junit.BeforeClass; 082import org.junit.ClassRule; 083import org.junit.Rule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.rules.TestName; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090/** 091 * WAL tests that can be reused across providers. 092 */ 093@Category({ RegionServerTests.class, MediumTests.class }) 094public class TestWALFactory { 095 096 @ClassRule 097 public static final HBaseClassTestRule CLASS_RULE = 098 HBaseClassTestRule.forClass(TestWALFactory.class); 099 100 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 101 102 protected static Configuration conf; 103 private static MiniDFSCluster cluster; 104 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 105 protected static Path hbaseDir; 106 protected static Path hbaseWALDir; 107 108 protected FileSystem fs; 109 protected Path dir; 110 protected WALFactory wals; 111 private ServerName currentServername; 112 113 @Rule 114 public final TestName currentTest = new TestName(); 115 116 @Before 117 public void setUp() throws Exception { 118 fs = cluster.getFileSystem(); 119 dir = new Path(hbaseDir, currentTest.getMethodName()); 120 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 121 wals = new WALFactory(conf, this.currentServername.toString()); 122 } 123 124 @After 125 public void tearDown() throws Exception { 126 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 127 try { 128 wals.close(); 129 } catch (IOException exception) { 130 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" 131 + " may be the cause. Message: " + exception); 132 LOG.debug("Exception details for failure to close wal factory.", exception); 133 } 134 FileStatus[] entries = fs.listStatus(new Path("/")); 135 for (FileStatus dir : entries) { 136 fs.delete(dir.getPath(), true); 137 } 138 } 139 140 @BeforeClass 141 public static void setUpBeforeClass() throws Exception { 142 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 143 // Make block sizes small. 144 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 145 // needed for testAppendClose() 146 // quicker heartbeat interval for faster DN death notification 147 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 148 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 149 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 150 151 // faster failover with cluster.shutdown();fs.close() idiom 152 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 153 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 154 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 155 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 156 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 157 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 158 SampleRegionWALCoprocessor.class.getName()); 159 TEST_UTIL.startMiniDFSCluster(3); 160 161 conf = TEST_UTIL.getConfiguration(); 162 cluster = TEST_UTIL.getDFSCluster(); 163 164 hbaseDir = TEST_UTIL.createRootDir(); 165 hbaseWALDir = TEST_UTIL.createWALRootDir(); 166 } 167 168 @AfterClass 169 public static void tearDownAfterClass() throws Exception { 170 TEST_UTIL.shutdownMiniCluster(); 171 } 172 173 @Test 174 public void canCloseSingleton() throws IOException { 175 WALFactory.getInstance(conf).close(); 176 } 177 178 /** 179 * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail. 180 */ 181 @Test 182 public void testSplit() throws IOException { 183 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 184 final byte[] rowName = tableName.getName(); 185 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 186 final int howmany = 3; 187 RegionInfo[] infos = new RegionInfo[3]; 188 Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); 189 fs.mkdirs(tableDataDir); 190 Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName); 191 fs.mkdirs(tabledir); 192 for (int i = 0; i < howmany; i++) { 193 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 194 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 195 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 196 fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); 197 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 198 } 199 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 200 scopes.put(Bytes.toBytes("column"), 0); 201 202 // Add edits for three regions. 203 for (int ii = 0; ii < howmany; ii++) { 204 for (int i = 0; i < howmany; i++) { 205 final WAL log = wals.getWAL(infos[i]); 206 for (int j = 0; j < howmany; j++) { 207 WALEdit edit = new WALEdit(); 208 byte[] family = Bytes.toBytes("column"); 209 byte[] qualifier = Bytes.toBytes(Integer.toString(j)); 210 byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); 211 edit.add( 212 new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column)); 213 LOG.info("Region " + i + ": " + edit); 214 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 215 EnvironmentEdgeManager.currentTime(), mvcc, scopes); 216 log.appendData(infos[i], walKey, edit); 217 walKey.getWriteEntry(); 218 } 219 log.sync(); 220 log.rollWriter(true); 221 } 222 } 223 wals.shutdown(); 224 // The below calculation of logDir relies on insider information... WALSplitter should be 225 // connected better 226 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 227 Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 228 this.currentServername.toString()); 229 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 230 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 231 verifySplits(splits, howmany); 232 } 233 234 /** 235 * Test new HDFS-265 sync. 236 */ 237 @Test 238 public void Broken_testSync() throws Exception { 239 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 240 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 241 // First verify that using streams all works. 242 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 243 FSDataOutputStream out = fs.create(p); 244 out.write(tableName.getName()); 245 Method syncMethod = null; 246 try { 247 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 248 } catch (NoSuchMethodException e) { 249 try { 250 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 251 } catch (NoSuchMethodException ex) { 252 fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush()."); 253 } 254 } 255 syncMethod.invoke(out, new Object[] {}); 256 FSDataInputStream in = fs.open(p); 257 assertTrue(in.available() > 0); 258 byte[] buffer = new byte[1024]; 259 int read = in.read(buffer); 260 assertEquals(tableName.getName().length, read); 261 out.close(); 262 in.close(); 263 264 final int total = 20; 265 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 266 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 267 scopes.put(tableName.getName(), 0); 268 final WAL wal = wals.getWAL(info); 269 270 for (int i = 0; i < total; i++) { 271 WALEdit kvs = new WALEdit(); 272 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 273 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 274 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 275 } 276 // Now call sync and try reading. Opening a Reader before you sync just 277 // gives you EOFE. 278 wal.sync(); 279 // Open a Reader. 280 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 281 int count = NoEOFWALStreamReader.count(wals, fs, walPath); 282 assertEquals(total, count); 283 // Add test that checks to see that an open of a Reader works on a file 284 // that has had a sync done on it. 285 for (int i = 0; i < total; i++) { 286 WALEdit kvs = new WALEdit(); 287 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 288 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 289 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 290 } 291 wal.sync(); 292 count = NoEOFWALStreamReader.count(wals, fs, walPath); 293 assertTrue(count >= total); 294 // If I sync, should see double the edits. 295 wal.sync(); 296 count = NoEOFWALStreamReader.count(wals, fs, walPath); 297 assertEquals(total * 2, count); 298 // Now do a test that ensures stuff works when we go over block boundary, 299 // especially that we return good length on file. 300 final byte[] value = new byte[1025 * 1024]; // Make a 1M value. 301 for (int i = 0; i < total; i++) { 302 WALEdit kvs = new WALEdit(); 303 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 304 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 305 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 306 } 307 // Now I should have written out lots of blocks. Sync then read. 308 wal.sync(); 309 count = NoEOFWALStreamReader.count(wals, fs, walPath); 310 assertEquals(total * 3, count); 311 // shutdown and ensure that Reader gets right length also. 312 wal.shutdown(); 313 count = NoEOFWALStreamReader.count(wals, fs, walPath); 314 assertEquals(total * 3, count); 315 } 316 317 private void verifySplits(final List<Path> splits, final int howmany) throws IOException { 318 assertEquals(howmany * howmany, splits.size()); 319 for (int i = 0; i < splits.size(); i++) { 320 LOG.info("Verifying=" + splits.get(i)); 321 try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) { 322 int count = 0; 323 String previousRegion = null; 324 long seqno = -1; 325 WAL.Entry entry = new WAL.Entry(); 326 while ((entry = reader.next(entry)) != null) { 327 WALKey key = entry.getKey(); 328 String region = Bytes.toString(key.getEncodedRegionName()); 329 // Assert that all edits are for same region. 330 if (previousRegion != null) { 331 assertEquals(previousRegion, region); 332 } 333 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 334 assertTrue(seqno < key.getSequenceId()); 335 seqno = key.getSequenceId(); 336 previousRegion = region; 337 count++; 338 } 339 assertEquals(howmany, count); 340 } 341 } 342 } 343 344 /* 345 * We pass different values to recoverFileLease() so that different code paths are covered For 346 * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze 347 * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain 348 * pendingCreates) 349 */ 350 @Test 351 public void testAppendClose() throws Exception { 352 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 353 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 354 355 WAL wal = wals.getWAL(regionInfo); 356 int total = 20; 357 358 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 359 scopes.put(tableName.getName(), 0); 360 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 361 for (int i = 0; i < total; i++) { 362 WALEdit kvs = new WALEdit(); 363 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 364 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 365 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 366 } 367 // Now call sync to send the data to HDFS datanodes 368 wal.sync(); 369 int namenodePort = cluster.getNameNodePort(); 370 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 371 372 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 373 try { 374 DistributedFileSystem dfs = cluster.getFileSystem(); 375 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 376 TEST_UTIL.shutdownMiniDFSCluster(); 377 try { 378 // wal.writer.close() will throw an exception, 379 // but still call this since it closes the LogSyncer thread first 380 wal.shutdown(); 381 } catch (IOException e) { 382 LOG.info(e.toString(), e); 383 } 384 fs.close(); // closing FS last so DFSOutputStream can't call close 385 LOG.info("STOPPED first instance of the cluster"); 386 } finally { 387 // Restart the cluster 388 while (cluster.isClusterUp()) { 389 LOG.error("Waiting for cluster to go down"); 390 Thread.sleep(1000); 391 } 392 assertFalse(cluster.isClusterUp()); 393 cluster = null; 394 for (int i = 0; i < 100; i++) { 395 try { 396 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 397 break; 398 } catch (BindException e) { 399 LOG.info("Sleeping. BindException bringing up new cluster"); 400 Threads.sleep(1000); 401 } 402 } 403 cluster.waitActive(); 404 fs = cluster.getFileSystem(); 405 LOG.info("STARTED second instance."); 406 } 407 408 // set the lease period to be 1 second so that the 409 // namenode triggers lease recovery upon append request 410 Method setLeasePeriod = 411 cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE }); 412 setLeasePeriod.setAccessible(true); 413 setLeasePeriod.invoke(cluster, 1000L, 1000L); 414 try { 415 Thread.sleep(1000); 416 } catch (InterruptedException e) { 417 LOG.info(e.toString(), e); 418 } 419 420 // Now try recovering the log, like the HMaster would do 421 final FileSystem recoveredFs = fs; 422 final Configuration rlConf = conf; 423 424 class RecoverLogThread extends Thread { 425 public Exception exception = null; 426 427 @Override 428 public void run() { 429 try { 430 RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); 431 } catch (IOException e) { 432 exception = e; 433 } 434 } 435 } 436 437 RecoverLogThread t = new RecoverLogThread(); 438 t.start(); 439 // Timeout after 60 sec. Without correct patches, would be an infinite loop 440 t.join(60 * 1000); 441 if (t.isAlive()) { 442 t.interrupt(); 443 throw new Exception("Timed out waiting for WAL.recoverLog()"); 444 } 445 446 if (t.exception != null) throw t.exception; 447 448 // Make sure you can read all the content 449 int count = 0; 450 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) { 451 WAL.Entry entry = new WAL.Entry(); 452 while (reader.next(entry) != null) { 453 count++; 454 assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1); 455 } 456 } 457 assertEquals(total, count); 458 459 // Reset the lease period 460 setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L }); 461 } 462 463 /** 464 * Tests that we can write out an edit, close, and then read it back in again. 465 */ 466 @Test 467 public void testEditAdd() throws IOException { 468 int colCount = 10; 469 TableDescriptor htd = 470 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 471 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 472 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 473 for (byte[] fam : htd.getColumnFamilyNames()) { 474 scopes.put(fam, 0); 475 } 476 byte[] row = Bytes.toBytes("row"); 477 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 478 479 // Write columns named 1, 2, 3, etc. and then values of single byte 480 // 1, 2, 3... 481 long timestamp = EnvironmentEdgeManager.currentTime(); 482 WALEdit cols = new WALEdit(); 483 for (int i = 0; i < colCount; i++) { 484 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 485 timestamp, new byte[] { (byte) (i + '0') })); 486 } 487 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 488 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 489 final WAL log = wals.getWAL(info); 490 491 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 492 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 493 log.sync(txid); 494 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 495 log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 496 log.shutdown(); 497 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 498 // Now open a reader on the log and assert append worked. 499 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) { 500 // Above we added all columns on a single row so we only read one 501 // entry in the below... thats why we have '1'. 502 for (int i = 0; i < 1; i++) { 503 WAL.Entry entry = reader.next(null); 504 if (entry == null) break; 505 WALKey key = entry.getKey(); 506 WALEdit val = entry.getEdit(); 507 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 508 assertTrue(htd.getTableName().equals(key.getTableName())); 509 Cell cell = val.getCells().get(0); 510 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 511 cell.getRowLength())); 512 assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]); 513 LOG.info(key + " " + val); 514 } 515 } 516 } 517 518 @Test 519 public void testAppend() throws IOException { 520 int colCount = 10; 521 TableDescriptor htd = 522 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 523 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 524 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 525 for (byte[] fam : htd.getColumnFamilyNames()) { 526 scopes.put(fam, 0); 527 } 528 byte[] row = Bytes.toBytes("row"); 529 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 530 // Write columns named 1, 2, 3, etc. and then values of single byte 531 // 1, 2, 3... 532 long timestamp = EnvironmentEdgeManager.currentTime(); 533 WALEdit cols = new WALEdit(); 534 for (int i = 0; i < colCount; i++) { 535 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 536 timestamp, new byte[] { (byte) (i + '0') })); 537 } 538 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 539 final WAL log = wals.getWAL(hri); 540 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 541 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 542 log.sync(txid); 543 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 544 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 545 log.shutdown(); 546 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 547 // Now open a reader on the log and assert append worked. 548 try (WALStreamReader reader = wals.createStreamReader(fs, filename)) { 549 WAL.Entry entry = reader.next(); 550 assertEquals(colCount, entry.getEdit().size()); 551 int idx = 0; 552 for (Cell val : entry.getEdit().getCells()) { 553 assertTrue( 554 Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); 555 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 556 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 557 val.getRowLength())); 558 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 559 System.out.println(entry.getKey() + " " + val); 560 idx++; 561 } 562 } 563 } 564 565 /** 566 * Test that we can visit entries before they are appended 567 */ 568 @Test 569 public void testVisitors() throws Exception { 570 final int COL_COUNT = 10; 571 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 572 final byte[] row = Bytes.toBytes("row"); 573 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 574 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 575 long timestamp = EnvironmentEdgeManager.currentTime(); 576 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 577 scopes.put(Bytes.toBytes("column"), 0); 578 579 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 580 final WAL log = wals.getWAL(hri); 581 log.registerWALActionsListener(visitor); 582 for (int i = 0; i < COL_COUNT; i++) { 583 WALEdit cols = new WALEdit(); 584 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 585 timestamp, new byte[] { (byte) (i + '0') })); 586 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 587 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 588 } 589 log.sync(); 590 assertEquals(COL_COUNT, visitor.increments); 591 log.unregisterWALActionsListener(visitor); 592 WALEdit cols = new WALEdit(); 593 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), 594 timestamp, new byte[] { (byte) (11 + '0') })); 595 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 596 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 597 log.sync(); 598 assertEquals(COL_COUNT, visitor.increments); 599 } 600 601 /** 602 * A loaded WAL coprocessor won't break existing WAL test cases. 603 */ 604 @Test 605 public void testWALCoprocessorLoaded() throws Exception { 606 // test to see whether the coprocessor is loaded or not. 607 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 608 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 609 assertNotNull(c); 610 } 611 612 static class DumbWALActionsListener implements WALActionsListener { 613 int increments = 0; 614 615 @Override 616 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 617 increments++; 618 } 619 } 620 621 @Test 622 public void testWALProviders() throws IOException { 623 Configuration conf = new Configuration(); 624 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 625 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 626 } 627 628 @Test 629 public void testOnlySetWALProvider() throws IOException { 630 Configuration conf = new Configuration(); 631 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 632 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 633 634 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 635 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 636 } 637 638 @Test 639 public void testOnlySetMetaWALProvider() throws IOException { 640 Configuration conf = new Configuration(); 641 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 642 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 643 644 assertEquals(WALFactory.Providers.defaultProvider.clazz, 645 walFactory.getWALProvider().getClass()); 646 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 647 } 648 649 @Test 650 public void testDefaultProvider() throws IOException { 651 final Configuration conf = new Configuration(); 652 // AsyncFSWal is the default, we should be able to request any WAL. 653 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 654 Class<? extends WALProvider> fshLogProvider = 655 normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 656 assertEquals(Providers.filesystem.clazz, fshLogProvider); 657 658 // Imagine a world where MultiWAL is the default 659 final WALFactory customizedWalFactory = 660 new WALFactory(conf, this.currentServername.toString()) { 661 @Override 662 Providers getDefaultProvider() { 663 return Providers.multiwal; 664 } 665 }; 666 // If we don't specify a WALProvider, we should get the default implementation. 667 Class<? extends WALProvider> multiwalProviderClass = 668 customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 669 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 670 } 671 672 @Test 673 public void testCustomProvider() throws IOException { 674 final Configuration config = new Configuration(); 675 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 676 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 677 Class<? extends WALProvider> walProvider = 678 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 679 assertEquals(IOTestProvider.class, walProvider); 680 WALProvider metaWALProvider = walFactory.getMetaProvider(); 681 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 682 } 683 684 @Test 685 public void testCustomMetaProvider() throws IOException { 686 final Configuration config = new Configuration(); 687 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 688 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 689 Class<? extends WALProvider> walProvider = 690 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 691 assertEquals(Providers.filesystem.clazz, walProvider); 692 WALProvider metaWALProvider = walFactory.getMetaProvider(); 693 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 694 } 695 696 @Test 697 public void testReaderClosedOnBadCodec() throws IOException { 698 // Create our own Configuration and WALFactory to avoid breaking other test methods 699 Configuration confWithCodec = new Configuration(conf); 700 confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, 701 Codec.class); 702 WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString()); 703 704 // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by 705 // the FileSystem and know if close() was called on those InputStreams. 706 List<InputStreamProxy> openedReaders = new ArrayList<>(); 707 FileSystemProxy proxyFs = new FileSystemProxy(fs) { 708 @Override 709 public FSDataInputStream open(Path p) throws IOException { 710 InputStreamProxy is = new InputStreamProxy(super.open(p)); 711 openedReaders.add(is); 712 return is; 713 } 714 715 @Override 716 public FSDataInputStream open(Path p, int blockSize) throws IOException { 717 InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); 718 openedReaders.add(is); 719 return is; 720 } 721 }; 722 723 final TableDescriptor htd = 724 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 725 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 726 final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 727 728 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 729 for (byte[] fam : htd.getColumnFamilyNames()) { 730 scopes.put(fam, 0); 731 } 732 byte[] row = Bytes.toBytes("row"); 733 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 734 // Write one column in one edit. 735 WALEdit cols = new WALEdit(); 736 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"), 737 EnvironmentEdgeManager.currentTime(), new byte[] { 0 })); 738 final WAL log = customFactory.getWAL(hri); 739 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 740 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 741 // Sync the edit to the WAL 742 log.sync(txid); 743 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 744 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 745 log.shutdown(); 746 747 // Inject our failure, object is constructed via reflection. 748 BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); 749 750 // Now open a reader on the log which will throw an exception when 751 // we try to instantiate the custom Codec. 752 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 753 assertThrows("Expected to see an exception when creating WAL reader", IOException.class, 754 () -> customFactory.createStreamReader(proxyFs, filename)); 755 // We should have exactly one reader 756 assertEquals(1, openedReaders.size()); 757 // And that reader should be closed. 758 long unclosedReaders = 759 openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting()); 760 assertEquals("Should not find any open readers", 0, unclosedReaders); 761 } 762 763 /** 764 * A proxy around FSDataInputStream which can report if close() was called. 765 */ 766 private static class InputStreamProxy extends FSDataInputStream { 767 private final InputStream real; 768 private final AtomicBoolean isClosed = new AtomicBoolean(false); 769 770 public InputStreamProxy(InputStream real) { 771 super(real); 772 this.real = real; 773 } 774 775 @Override 776 public void close() throws IOException { 777 isClosed.set(true); 778 real.close(); 779 } 780 } 781 782 /** 783 * A custom WALCellCodec in which we can inject failure. 784 */ 785 @SuppressWarnings("unused") 786 private static class BrokenWALCellCodec extends WALCellCodec { 787 static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); 788 789 static void maybeInjectFailure() { 790 if (THROW_FAILURE_ON_INIT.get()) { 791 throw new RuntimeException("Injected instantiation exception"); 792 } 793 } 794 795 public BrokenWALCellCodec() { 796 super(); 797 maybeInjectFailure(); 798 } 799 800 public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { 801 super(conf, compression); 802 maybeInjectFailure(); 803 } 804 } 805}