001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER; 021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertThrows; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.InputStream; 032import java.lang.reflect.Method; 033import java.net.BindException; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.NavigableMap; 037import java.util.TreeMap; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.Coprocessor; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseTestingUtil; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.RegionInfo; 057import org.apache.hadoop.hbase.client.RegionInfoBuilder; 058import org.apache.hadoop.hbase.client.TableDescriptor; 059import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 060import org.apache.hadoop.hbase.codec.Codec; 061import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 062import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 063import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 064import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 065import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 066import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 067import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 068import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 069import org.apache.hadoop.hbase.testclassification.MediumTests; 070import org.apache.hadoop.hbase.testclassification.RegionServerTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.CommonFSUtils; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 075import org.apache.hadoop.hbase.util.Threads; 076import org.apache.hadoop.hbase.wal.WALFactory.Providers; 077import org.apache.hadoop.hdfs.DistributedFileSystem; 078import org.apache.hadoop.hdfs.MiniDFSCluster; 079import org.apache.hadoop.hdfs.protocol.HdfsConstants; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092/** 093 * WAL tests that can be reused across providers. 094 */ 095@Category({ RegionServerTests.class, MediumTests.class }) 096public class TestWALFactory { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestWALFactory.class); 101 102 private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class); 103 104 protected static Configuration conf; 105 private static MiniDFSCluster cluster; 106 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 107 protected static Path hbaseDir; 108 protected static Path hbaseWALDir; 109 110 protected FileSystem fs; 111 protected Path dir; 112 protected WALFactory wals; 113 private ServerName currentServername; 114 115 @Rule 116 public final TestName currentTest = new TestName(); 117 118 @Before 119 public void setUp() throws Exception { 120 fs = cluster.getFileSystem(); 121 dir = new Path(hbaseDir, currentTest.getMethodName()); 122 this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1); 123 wals = new WALFactory(conf, this.currentServername.toString()); 124 } 125 126 @After 127 public void tearDown() throws Exception { 128 // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here. 129 try { 130 wals.close(); 131 } catch (IOException exception) { 132 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" 133 + " may be the cause. Message: " + exception); 134 LOG.debug("Exception details for failure to close wal factory.", exception); 135 } 136 FileStatus[] entries = fs.listStatus(new Path("/")); 137 for (FileStatus dir : entries) { 138 fs.delete(dir.getPath(), true); 139 } 140 } 141 142 @BeforeClass 143 public static void setUpBeforeClass() throws Exception { 144 CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); 145 // Make block sizes small. 146 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 147 // needed for testAppendClose() 148 // quicker heartbeat interval for faster DN death notification 149 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 150 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 151 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 152 153 // faster failover with cluster.shutdown();fs.close() idiom 154 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 155 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 156 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 157 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 158 TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 159 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 160 SampleRegionWALCoprocessor.class.getName()); 161 TEST_UTIL.startMiniDFSCluster(3); 162 163 conf = TEST_UTIL.getConfiguration(); 164 cluster = TEST_UTIL.getDFSCluster(); 165 166 hbaseDir = TEST_UTIL.createRootDir(); 167 hbaseWALDir = TEST_UTIL.createWALRootDir(); 168 } 169 170 @AfterClass 171 public static void tearDownAfterClass() throws Exception { 172 TEST_UTIL.shutdownMiniCluster(); 173 } 174 175 @Test 176 public void canCloseSingleton() throws IOException { 177 WALFactory.getInstance(conf).close(); 178 } 179 180 /** 181 * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail. 182 */ 183 @Test 184 public void testSplit() throws IOException { 185 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 186 final byte[] rowName = tableName.getName(); 187 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 188 final int howmany = 3; 189 RegionInfo[] infos = new RegionInfo[3]; 190 Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); 191 fs.mkdirs(tableDataDir); 192 Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName); 193 fs.mkdirs(tabledir); 194 for (int i = 0; i < howmany; i++) { 195 infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) 196 .setEndKey(Bytes.toBytes("" + (i + 1))).build(); 197 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); 198 fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); 199 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); 200 } 201 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 202 scopes.put(Bytes.toBytes("column"), 0); 203 204 // Add edits for three regions. 205 for (int ii = 0; ii < howmany; ii++) { 206 for (int i = 0; i < howmany; i++) { 207 final WAL log = wals.getWAL(infos[i]); 208 for (int j = 0; j < howmany; j++) { 209 WALEdit edit = new WALEdit(); 210 byte[] family = Bytes.toBytes("column"); 211 byte[] qualifier = Bytes.toBytes(Integer.toString(j)); 212 byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); 213 edit.add( 214 new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column)); 215 LOG.info("Region " + i + ": " + edit); 216 WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, 217 EnvironmentEdgeManager.currentTime(), mvcc, scopes); 218 log.appendData(infos[i], walKey, edit); 219 walKey.getWriteEntry(); 220 } 221 log.sync(); 222 log.rollWriter(true); 223 } 224 } 225 wals.shutdown(); 226 // The below calculation of logDir relies on insider information... WALSplitter should be 227 // connected better 228 // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used. 229 Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME), 230 this.currentServername.toString()); 231 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 232 List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals); 233 verifySplits(splits, howmany); 234 } 235 236 /** 237 * Test new HDFS-265 sync. 238 */ 239 @Test 240 public void Broken_testSync() throws Exception { 241 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 242 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 243 // First verify that using streams all works. 244 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos"); 245 FSDataOutputStream out = fs.create(p); 246 out.write(tableName.getName()); 247 Method syncMethod = null; 248 try { 249 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 250 } catch (NoSuchMethodException e) { 251 try { 252 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 253 } catch (NoSuchMethodException ex) { 254 fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush()."); 255 } 256 } 257 syncMethod.invoke(out, new Object[] {}); 258 FSDataInputStream in = fs.open(p); 259 assertTrue(in.available() > 0); 260 byte[] buffer = new byte[1024]; 261 int read = in.read(buffer); 262 assertEquals(tableName.getName().length, read); 263 out.close(); 264 in.close(); 265 266 final int total = 20; 267 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 268 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 269 scopes.put(tableName.getName(), 0); 270 final WAL wal = wals.getWAL(info); 271 272 for (int i = 0; i < total; i++) { 273 WALEdit kvs = new WALEdit(); 274 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 275 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 276 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 277 } 278 // Now call sync and try reading. Opening a Reader before you sync just 279 // gives you EOFE. 280 wal.sync(); 281 // Open a Reader. 282 Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 283 int count = NoEOFWALStreamReader.count(wals, fs, walPath); 284 assertEquals(total, count); 285 // Add test that checks to see that an open of a Reader works on a file 286 // that has had a sync done on it. 287 for (int i = 0; i < total; i++) { 288 WALEdit kvs = new WALEdit(); 289 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 290 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 291 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 292 } 293 wal.sync(); 294 count = NoEOFWALStreamReader.count(wals, fs, walPath); 295 assertTrue(count >= total); 296 // If I sync, should see double the edits. 297 wal.sync(); 298 count = NoEOFWALStreamReader.count(wals, fs, walPath); 299 assertEquals(total * 2, count); 300 // Now do a test that ensures stuff works when we go over block boundary, 301 // especially that we return good length on file. 302 final byte[] value = new byte[1025 * 1024]; // Make a 1M value. 303 for (int i = 0; i < total; i++) { 304 WALEdit kvs = new WALEdit(); 305 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); 306 wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 307 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 308 } 309 // Now I should have written out lots of blocks. Sync then read. 310 wal.sync(); 311 count = NoEOFWALStreamReader.count(wals, fs, walPath); 312 assertEquals(total * 3, count); 313 // shutdown and ensure that Reader gets right length also. 314 wal.shutdown(); 315 count = NoEOFWALStreamReader.count(wals, fs, walPath); 316 assertEquals(total * 3, count); 317 } 318 319 private void verifySplits(final List<Path> splits, final int howmany) throws IOException { 320 assertEquals(howmany * howmany, splits.size()); 321 for (int i = 0; i < splits.size(); i++) { 322 LOG.info("Verifying=" + splits.get(i)); 323 try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) { 324 int count = 0; 325 String previousRegion = null; 326 long seqno = -1; 327 WAL.Entry entry = new WAL.Entry(); 328 while ((entry = reader.next(entry)) != null) { 329 WALKey key = entry.getKey(); 330 String region = Bytes.toString(key.getEncodedRegionName()); 331 // Assert that all edits are for same region. 332 if (previousRegion != null) { 333 assertEquals(previousRegion, region); 334 } 335 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId()); 336 assertTrue(seqno < key.getSequenceId()); 337 seqno = key.getSequenceId(); 338 previousRegion = region; 339 count++; 340 } 341 assertEquals(howmany, count); 342 } 343 } 344 } 345 346 /* 347 * We pass different values to recoverFileLease() so that different code paths are covered For 348 * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze 349 * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain 350 * pendingCreates) 351 */ 352 @Test 353 public void testAppendClose() throws Exception { 354 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 355 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 356 357 WAL wal = wals.getWAL(regionInfo); 358 int total = 20; 359 360 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 361 scopes.put(tableName.getName(), 0); 362 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 363 for (int i = 0; i < total; i++) { 364 WALEdit kvs = new WALEdit(); 365 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); 366 wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 367 EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs); 368 } 369 // Now call sync to send the data to HDFS datanodes 370 wal.sync(); 371 int namenodePort = cluster.getNameNodePort(); 372 final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); 373 374 // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster) 375 try { 376 DistributedFileSystem dfs = cluster.getFileSystem(); 377 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); 378 TEST_UTIL.shutdownMiniDFSCluster(); 379 try { 380 // wal.writer.close() will throw an exception, 381 // but still call this since it closes the LogSyncer thread first 382 wal.shutdown(); 383 } catch (IOException e) { 384 LOG.info(e.toString(), e); 385 } 386 fs.close(); // closing FS last so DFSOutputStream can't call close 387 LOG.info("STOPPED first instance of the cluster"); 388 } finally { 389 // Restart the cluster 390 while (cluster.isClusterUp()) { 391 LOG.error("Waiting for cluster to go down"); 392 Thread.sleep(1000); 393 } 394 assertFalse(cluster.isClusterUp()); 395 cluster = null; 396 for (int i = 0; i < 100; i++) { 397 try { 398 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort); 399 break; 400 } catch (BindException e) { 401 LOG.info("Sleeping. BindException bringing up new cluster"); 402 Threads.sleep(1000); 403 } 404 } 405 cluster.waitActive(); 406 fs = cluster.getFileSystem(); 407 LOG.info("STARTED second instance."); 408 } 409 410 // set the lease period to be 1 second so that the 411 // namenode triggers lease recovery upon append request 412 Method setLeasePeriod = 413 cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE }); 414 setLeasePeriod.setAccessible(true); 415 setLeasePeriod.invoke(cluster, 1000L, 1000L); 416 try { 417 Thread.sleep(1000); 418 } catch (InterruptedException e) { 419 LOG.info(e.toString(), e); 420 } 421 422 // Now try recovering the log, like the HMaster would do 423 final FileSystem recoveredFs = fs; 424 final Configuration rlConf = conf; 425 426 class RecoverLogThread extends Thread { 427 public Exception exception = null; 428 429 @Override 430 public void run() { 431 try { 432 RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); 433 } catch (IOException e) { 434 exception = e; 435 } 436 } 437 } 438 439 RecoverLogThread t = new RecoverLogThread(); 440 t.start(); 441 // Timeout after 60 sec. Without correct patches, would be an infinite loop 442 t.join(60 * 1000); 443 if (t.isAlive()) { 444 t.interrupt(); 445 throw new Exception("Timed out waiting for WAL.recoverLog()"); 446 } 447 448 if (t.exception != null) throw t.exception; 449 450 // Make sure you can read all the content 451 int count = 0; 452 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) { 453 WAL.Entry entry = new WAL.Entry(); 454 while (reader.next(entry) != null) { 455 count++; 456 assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1); 457 } 458 } 459 assertEquals(total, count); 460 461 // Reset the lease period 462 setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L }); 463 } 464 465 /** 466 * Tests that we can write out an edit, close, and then read it back in again. 467 */ 468 @Test 469 public void testEditAdd() throws IOException { 470 int colCount = 10; 471 TableDescriptor htd = 472 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 473 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 474 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 475 for (byte[] fam : htd.getColumnFamilyNames()) { 476 scopes.put(fam, 0); 477 } 478 byte[] row = Bytes.toBytes("row"); 479 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 480 481 // Write columns named 1, 2, 3, etc. and then values of single byte 482 // 1, 2, 3... 483 long timestamp = EnvironmentEdgeManager.currentTime(); 484 WALEdit cols = new WALEdit(); 485 for (int i = 0; i < colCount; i++) { 486 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 487 timestamp, new byte[] { (byte) (i + '0') })); 488 } 489 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row) 490 .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); 491 final WAL log = wals.getWAL(info); 492 493 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), 494 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 495 log.sync(txid); 496 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 497 log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 498 log.shutdown(); 499 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 500 // Now open a reader on the log and assert append worked. 501 try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) { 502 // Above we added all columns on a single row so we only read one 503 // entry in the below... thats why we have '1'. 504 for (int i = 0; i < 1; i++) { 505 WAL.Entry entry = reader.next(null); 506 if (entry == null) break; 507 WALKey key = entry.getKey(); 508 WALEdit val = entry.getEdit(); 509 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); 510 assertTrue(htd.getTableName().equals(key.getTableName())); 511 Cell cell = val.getCells().get(0); 512 assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(), 513 cell.getRowLength())); 514 assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]); 515 LOG.info(key + " " + val); 516 } 517 } 518 } 519 520 @Test 521 public void testAppend() throws IOException { 522 int colCount = 10; 523 TableDescriptor htd = 524 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 525 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 526 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 527 for (byte[] fam : htd.getColumnFamilyNames()) { 528 scopes.put(fam, 0); 529 } 530 byte[] row = Bytes.toBytes("row"); 531 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 532 // Write columns named 1, 2, 3, etc. and then values of single byte 533 // 1, 2, 3... 534 long timestamp = EnvironmentEdgeManager.currentTime(); 535 WALEdit cols = new WALEdit(); 536 for (int i = 0; i < colCount; i++) { 537 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 538 timestamp, new byte[] { (byte) (i + '0') })); 539 } 540 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 541 final WAL log = wals.getWAL(hri); 542 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 543 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 544 log.sync(txid); 545 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 546 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 547 log.shutdown(); 548 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 549 // Now open a reader on the log and assert append worked. 550 try (WALStreamReader reader = wals.createStreamReader(fs, filename)) { 551 WAL.Entry entry = reader.next(); 552 assertEquals(colCount, entry.getEdit().size()); 553 int idx = 0; 554 for (Cell val : entry.getEdit().getCells()) { 555 assertTrue( 556 Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); 557 assertTrue(htd.getTableName().equals(entry.getKey().getTableName())); 558 assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), 559 val.getRowLength())); 560 assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]); 561 System.out.println(entry.getKey() + " " + val); 562 idx++; 563 } 564 } 565 } 566 567 /** 568 * Test that we can visit entries before they are appended 569 */ 570 @Test 571 public void testVisitors() throws Exception { 572 final int COL_COUNT = 10; 573 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 574 final byte[] row = Bytes.toBytes("row"); 575 final DumbWALActionsListener visitor = new DumbWALActionsListener(); 576 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 577 long timestamp = EnvironmentEdgeManager.currentTime(); 578 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 579 scopes.put(Bytes.toBytes("column"), 0); 580 581 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 582 final WAL log = wals.getWAL(hri); 583 log.registerWALActionsListener(visitor); 584 for (int i = 0; i < COL_COUNT; i++) { 585 WALEdit cols = new WALEdit(); 586 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), 587 timestamp, new byte[] { (byte) (i + '0') })); 588 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 589 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 590 } 591 log.sync(); 592 assertEquals(COL_COUNT, visitor.increments); 593 log.unregisterWALActionsListener(visitor); 594 WALEdit cols = new WALEdit(); 595 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), 596 timestamp, new byte[] { (byte) (11 + '0') })); 597 log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 598 EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 599 log.sync(); 600 assertEquals(COL_COUNT, visitor.increments); 601 } 602 603 /** 604 * A loaded WAL coprocessor won't break existing WAL test cases. 605 */ 606 @Test 607 public void testWALCoprocessorLoaded() throws Exception { 608 // test to see whether the coprocessor is loaded or not. 609 WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost(); 610 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 611 assertNotNull(c); 612 } 613 614 static class DumbWALActionsListener implements WALActionsListener { 615 int increments = 0; 616 617 @Override 618 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 619 increments++; 620 } 621 } 622 623 @Test 624 public void testWALProviders() throws IOException { 625 Configuration conf = new Configuration(); 626 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 627 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 628 629 // if providers are not set and do not enable SyncReplicationWALProvider 630 walFactory = new WALFactory(conf, this.currentServername, null); 631 assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); 632 } 633 634 @Test 635 public void testOnlySetWALProvider() throws IOException { 636 Configuration conf = new Configuration(); 637 conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); 638 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 639 // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set 640 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass()); 641 assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); 642 } 643 644 @Test 645 public void testOnlySetMetaWALProvider() throws IOException { 646 Configuration conf = new Configuration(); 647 conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); 648 WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); 649 assertEquals(WALFactory.Providers.defaultProvider.clazz, 650 walFactory.getWALProvider().getClass()); 651 assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); 652 } 653 654 @Test 655 public void testDefaultProvider() throws IOException { 656 final Configuration conf = new Configuration(); 657 // AsyncFSWal is the default, we should be able to request any WAL. 658 final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); 659 Class<? extends WALProvider> fshLogProvider = 660 normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 661 assertEquals(Providers.filesystem.clazz, fshLogProvider); 662 663 // Imagine a world where MultiWAL is the default 664 final WALFactory customizedWalFactory = 665 new WALFactory(conf, this.currentServername.toString()) { 666 @Override 667 Providers getDefaultProvider() { 668 return Providers.multiwal; 669 } 670 }; 671 // If we don't specify a WALProvider, we should get the default implementation. 672 Class<? extends WALProvider> multiwalProviderClass = 673 customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name()); 674 assertEquals(Providers.multiwal.clazz, multiwalProviderClass); 675 } 676 677 @Test 678 public void testCustomProvider() throws IOException { 679 final Configuration config = new Configuration(); 680 config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); 681 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 682 Class<? extends WALProvider> walProvider = 683 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 684 assertEquals(IOTestProvider.class, walProvider); 685 WALProvider metaWALProvider = walFactory.getMetaProvider(); 686 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 687 } 688 689 @Test 690 public void testCustomMetaProvider() throws IOException { 691 final Configuration config = new Configuration(); 692 config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); 693 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 694 Class<? extends WALProvider> walProvider = 695 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 696 assertEquals(Providers.filesystem.clazz, walProvider); 697 WALProvider metaWALProvider = walFactory.getMetaProvider(); 698 assertEquals(IOTestProvider.class, metaWALProvider.getClass()); 699 } 700 701 @Test 702 public void testCustomReplicationProvider() throws IOException { 703 final Configuration config = new Configuration(); 704 config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName()); 705 final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); 706 Class<? extends WALProvider> walProvider = 707 walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name()); 708 assertEquals(Providers.filesystem.clazz, walProvider); 709 WALProvider replicationWALProvider = walFactory.getReplicationProvider(); 710 assertEquals(IOTestProvider.class, replicationWALProvider.getClass()); 711 } 712 713 /** 714 * Confirm that we will use different WALs for hbase:meta and hbase:replication 715 */ 716 @Test 717 public void testDifferentWALs() throws IOException { 718 WAL normalWAL = wals.getWAL(null); 719 WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO); 720 WAL replicationWAL = wals.getWAL(RegionInfoBuilder 721 .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build()); 722 assertNotSame(normalWAL, metaWAL); 723 assertNotSame(normalWAL, replicationWAL); 724 assertNotSame(metaWAL, replicationWAL); 725 } 726 727 @Test 728 public void testReaderClosedOnBadCodec() throws IOException { 729 // Create our own Configuration and WALFactory to avoid breaking other test methods 730 Configuration confWithCodec = new Configuration(conf); 731 confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, 732 Codec.class); 733 WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString()); 734 735 // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by 736 // the FileSystem and know if close() was called on those InputStreams. 737 List<InputStreamProxy> openedReaders = new ArrayList<>(); 738 FileSystemProxy proxyFs = new FileSystemProxy(fs) { 739 @Override 740 public FSDataInputStream open(Path p) throws IOException { 741 InputStreamProxy is = new InputStreamProxy(super.open(p)); 742 openedReaders.add(is); 743 return is; 744 } 745 746 @Override 747 public FSDataInputStream open(Path p, int blockSize) throws IOException { 748 InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize)); 749 openedReaders.add(is); 750 return is; 751 } 752 }; 753 754 final TableDescriptor htd = 755 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) 756 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build(); 757 final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 758 759 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 760 for (byte[] fam : htd.getColumnFamilyNames()) { 761 scopes.put(fam, 0); 762 } 763 byte[] row = Bytes.toBytes("row"); 764 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); 765 // Write one column in one edit. 766 WALEdit cols = new WALEdit(); 767 cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"), 768 EnvironmentEdgeManager.currentTime(), new byte[] { 0 })); 769 final WAL log = customFactory.getWAL(hri); 770 final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 771 htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols); 772 // Sync the edit to the WAL 773 log.sync(txid); 774 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); 775 log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 776 log.shutdown(); 777 778 // Inject our failure, object is constructed via reflection. 779 BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true); 780 781 // Now open a reader on the log which will throw an exception when 782 // we try to instantiate the custom Codec. 783 Path filename = AbstractFSWALProvider.getCurrentFileName(log); 784 assertThrows("Expected to see an exception when creating WAL reader", IOException.class, 785 () -> customFactory.createStreamReader(proxyFs, filename)); 786 // We should have exactly one reader 787 assertEquals(1, openedReaders.size()); 788 // And that reader should be closed. 789 long unclosedReaders = 790 openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting()); 791 assertEquals("Should not find any open readers", 0, unclosedReaders); 792 } 793 794 /** 795 * A proxy around FSDataInputStream which can report if close() was called. 796 */ 797 private static class InputStreamProxy extends FSDataInputStream { 798 private final InputStream real; 799 private final AtomicBoolean isClosed = new AtomicBoolean(false); 800 801 public InputStreamProxy(InputStream real) { 802 super(real); 803 this.real = real; 804 } 805 806 @Override 807 public void close() throws IOException { 808 isClosed.set(true); 809 real.close(); 810 } 811 } 812 813 /** 814 * A custom WALCellCodec in which we can inject failure. 815 */ 816 @SuppressWarnings("unused") 817 private static class BrokenWALCellCodec extends WALCellCodec { 818 static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false); 819 820 static void maybeInjectFailure() { 821 if (THROW_FAILURE_ON_INIT.get()) { 822 throw new RuntimeException("Injected instantiation exception"); 823 } 824 } 825 826 public BrokenWALCellCodec() { 827 super(); 828 maybeInjectFailure(); 829 } 830 831 public BrokenWALCellCodec(Configuration conf, CompressionContext compression) { 832 super(conf, compression); 833 maybeInjectFailure(); 834 } 835 } 836}