001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey; 022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; 023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.junit.Assume.assumeFalse; 029 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.lang.reflect.Method; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.List; 040import java.util.Map; 041import java.util.NavigableSet; 042import java.util.Objects; 043import java.util.Set; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicInteger; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.stream.Collectors; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FSDataInputStream; 050import org.apache.hadoop.fs.FSDataOutputStream; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.FileUtil; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.PathFilter; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.HBaseClassTestRule; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseTestingUtility; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.KeyValue; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.client.RegionInfo; 065import org.apache.hadoop.hbase.client.RegionInfoBuilder; 066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 067import org.apache.hadoop.hbase.master.SplitLogManager; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 070import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader; 071import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 072import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 073import org.apache.hadoop.hbase.security.User; 074import org.apache.hadoop.hbase.testclassification.LargeTests; 075import org.apache.hadoop.hbase.testclassification.RegionServerTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.CancelableProgressable; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.hadoop.hbase.util.Threads; 081import org.apache.hadoop.hbase.wal.WAL.Entry; 082import org.apache.hadoop.hbase.wal.WALProvider.Writer; 083import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 084import org.apache.hadoop.hdfs.DFSTestUtil; 085import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 086import org.apache.hadoop.ipc.RemoteException; 087import org.junit.After; 088import org.junit.AfterClass; 089import org.junit.Before; 090import org.junit.BeforeClass; 091import org.junit.ClassRule; 092import org.junit.Rule; 093import org.junit.Test; 094import org.junit.experimental.categories.Category; 095import org.junit.rules.TestName; 096import org.mockito.Mockito; 097import org.mockito.invocation.InvocationOnMock; 098import org.mockito.stubbing.Answer; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 103import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 104import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 105import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 106 107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 109 110/** 111 * Testing {@link WAL} splitting code. 112 */ 113@Category({ RegionServerTests.class, LargeTests.class }) 114public class TestWALSplit { 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestWALSplit.class); 118 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 119 120 private static Configuration conf; 121 private FileSystem fs; 122 123 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 124 125 private Path HBASEDIR; 126 private Path HBASELOGDIR; 127 private Path WALDIR; 128 private Path OLDLOGDIR; 129 private Path CORRUPTDIR; 130 private Path TABLEDIR; 131 private String TMPDIRNAME; 132 133 private static final int NUM_WRITERS = 10; 134 private static final int ENTRIES = 10; // entries per writer per region 135 136 private static final String FILENAME_BEING_SPLIT = "testfile"; 137 private static final TableName TABLE_NAME = TableName.valueOf("t1"); 138 private static final byte[] FAMILY = Bytes.toBytes("f1"); 139 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 140 private static final byte[] VALUE = Bytes.toBytes("v1"); 141 private static final String WAL_FILE_PREFIX = "wal.dat."; 142 private static List<String> REGIONS = new ArrayList<>(); 143 private static String ROBBER; 144 private static String ZOMBIE; 145 private static String[] GROUP = new String[] { "supergroup" }; 146 147 static enum Corruptions { 148 INSERT_GARBAGE_ON_FIRST_LINE, 149 INSERT_GARBAGE_IN_THE_MIDDLE, 150 APPEND_GARBAGE, 151 TRUNCATE, 152 TRUNCATE_TRAILER 153 } 154 155 @BeforeClass 156 public static void setUpBeforeClass() throws Exception { 157 conf = TEST_UTIL.getConfiguration(); 158 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 159 conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class); 160 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 161 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 162 // Create fake maping user to group and set it to the conf. 163 Map<String, String[]> u2g_map = new HashMap<>(2); 164 ROBBER = User.getCurrent().getName() + "-robber"; 165 ZOMBIE = User.getCurrent().getName() + "-zombie"; 166 u2g_map.put(ROBBER, GROUP); 167 u2g_map.put(ZOMBIE, GROUP); 168 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 169 conf.setInt("dfs.heartbeat.interval", 1); 170 TEST_UTIL.startMiniDFSCluster(2); 171 } 172 173 @AfterClass 174 public static void tearDownAfterClass() throws Exception { 175 TEST_UTIL.shutdownMiniDFSCluster(); 176 } 177 178 @Rule 179 public TestName name = new TestName(); 180 private WALFactory wals = null; 181 182 @Before 183 public void setUp() throws Exception { 184 LOG.info("Cleaning up cluster for new test."); 185 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 186 HBASEDIR = TEST_UTIL.createRootDir(); 187 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 188 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 189 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 190 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 191 TMPDIRNAME = 192 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 193 REGIONS.clear(); 194 Collections.addAll(REGIONS, "bbb", "ccc"); 195 InstrumentedLogWriter.activateFailure = false; 196 wals = new WALFactory(conf, name.getMethodName()); 197 WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName 198 .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString())); 199 // fs.mkdirs(WALDIR); 200 } 201 202 @After 203 public void tearDown() throws Exception { 204 try { 205 wals.close(); 206 } catch (IOException exception) { 207 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 208 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" 209 + " you see a failure look here."); 210 LOG.debug("exception details", exception); 211 } finally { 212 wals = null; 213 fs.delete(HBASEDIR, true); 214 fs.delete(HBASELOGDIR, true); 215 } 216 } 217 218 /** 219 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 220 * Ensures we do not lose edits. 221 */ 222 @Test 223 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 224 final AtomicLong counter = new AtomicLong(0); 225 AtomicBoolean stop = new AtomicBoolean(false); 226 // Region we'll write edits too and then later examine to make sure they all made it in. 227 final String region = REGIONS.get(0); 228 final int numWriters = 3; 229 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 230 try { 231 long startCount = counter.get(); 232 zombie.start(); 233 // Wait till writer starts going. 234 while (startCount == counter.get()) 235 Threads.sleep(1); 236 // Give it a second to write a few appends. 237 Threads.sleep(1000); 238 final Configuration conf2 = HBaseConfiguration.create(conf); 239 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 240 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 241 @Override 242 public Integer run() throws Exception { 243 StringBuilder ls = 244 new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n"); 245 for (FileStatus status : fs.listStatus(WALDIR)) { 246 ls.append("\t").append(status.toString()).append("\n"); 247 } 248 LOG.debug(Objects.toString(ls)); 249 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 250 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 251 LOG.info("Finished splitting out from under zombie."); 252 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 253 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 254 int count = 0; 255 for (Path logfile : logfiles) { 256 count += countWAL(logfile); 257 } 258 return count; 259 } 260 }); 261 LOG.info("zombie=" + counter.get() + ", robber=" + count); 262 assertTrue( 263 "The log file could have at most 1 extra log entry, but can't have less. " 264 + "Zombie could write " + counter.get() + " and logfile had only " + count, 265 counter.get() == count || counter.get() + 1 == count); 266 } finally { 267 stop.set(true); 268 zombie.interrupt(); 269 Threads.threadDumpingIsAlive(zombie); 270 } 271 } 272 273 /** 274 * This thread will keep writing to a 'wal' file even after the split process has started. It 275 * simulates a region server that was considered dead but woke up and wrote some more to the last 276 * log entry. Does its writing as an alternate user in another filesystem instance to simulate 277 * better it being a regionserver. 278 */ 279 private class ZombieLastLogWriterRegionServer extends Thread { 280 final AtomicLong editsCount; 281 final AtomicBoolean stop; 282 final int numOfWriters; 283 /** 284 * Region to write edits for. 285 */ 286 final String region; 287 final User user; 288 289 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 290 final String region, final int writers) throws IOException, InterruptedException { 291 super("ZombieLastLogWriterRegionServer"); 292 setDaemon(true); 293 this.stop = stop; 294 this.editsCount = counter; 295 this.region = region; 296 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 297 numOfWriters = writers; 298 } 299 300 @Override 301 public void run() { 302 try { 303 doWriting(); 304 } catch (IOException e) { 305 LOG.warn(getName() + " Writer exiting " + e); 306 } catch (InterruptedException e) { 307 LOG.warn(getName() + " Writer exiting " + e); 308 } 309 } 310 311 private void doWriting() throws IOException, InterruptedException { 312 this.user.runAs(new PrivilegedExceptionAction<Object>() { 313 @Override 314 public Object run() throws Exception { 315 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 316 // index we supply here. 317 int walToKeepOpen = numOfWriters - 1; 318 // The below method writes numOfWriters files each with ENTRIES entries for a total of 319 // numOfWriters * ENTRIES added per column family in the region. 320 Writer writer = null; 321 try { 322 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 323 } catch (IOException e1) { 324 throw new RuntimeException("Failed", e1); 325 } 326 // Update counter so has all edits written so far. 327 editsCount.addAndGet(numOfWriters * ENTRIES); 328 loop(writer); 329 // If we've been interruped, then things should have shifted out from under us. 330 // closing should error 331 try { 332 writer.close(); 333 fail("Writing closing after parsing should give an error."); 334 } catch (IOException exception) { 335 LOG.debug("ignoring error when closing final writer.", exception); 336 } 337 return null; 338 } 339 }); 340 } 341 342 private void loop(final Writer writer) { 343 byte[] regionBytes = Bytes.toBytes(this.region); 344 while (!stop.get()) { 345 try { 346 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 347 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 348 long count = editsCount.incrementAndGet(); 349 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 350 try { 351 Thread.sleep(1); 352 } catch (InterruptedException e) { 353 // 354 } 355 } catch (IOException ex) { 356 LOG.error(getName() + " ex " + ex.toString()); 357 if (ex instanceof RemoteException) { 358 LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " 359 + (editsCount.get() + 1)); 360 } else { 361 LOG.error(getName() + " failed to write....at " + editsCount.get()); 362 fail("Failed to write " + editsCount.get()); 363 } 364 break; 365 } catch (Throwable t) { 366 LOG.error(getName() + " HOW? " + t); 367 LOG.debug("exception details", t); 368 break; 369 } 370 } 371 LOG.info(getName() + " Writer exiting"); 372 } 373 } 374 375 /** 376 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 377 */ 378 @Test 379 public void testRecoveredEditsPathForMeta() throws IOException { 380 Path p = createRecoveredEditsPathForRegion(); 381 String parentOfParent = p.getParent().getParent().getName(); 382 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 383 } 384 385 /** 386 * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old 387 * instances. 388 */ 389 @Test 390 public void testOldRecoveredEditsFileSidelined() throws IOException { 391 Path p = createRecoveredEditsPathForRegion(); 392 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 393 Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 394 fs.mkdirs(regiondir); 395 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 396 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 397 fs.createNewFile(parent); // create a recovered.edits file 398 String parentOfParent = p.getParent().getParent().getName(); 399 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 400 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 401 } 402 403 private Path createRecoveredEditsPathForRegion() throws IOException { 404 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 405 Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, 406 FILENAME_BEING_SPLIT, TMPDIRNAME, conf); 407 return p; 408 } 409 410 @Test 411 public void testHasRecoveredEdits() throws IOException { 412 Path p = createRecoveredEditsPathForRegion(); 413 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 414 String renamedEdit = p.getName().split("-")[0]; 415 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 416 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 417 } 418 419 private void useDifferentDFSClient() throws IOException { 420 // make fs act as a different client now 421 // initialize will create a new DFSClient with a new client ID 422 fs.initialize(fs.getUri(), conf); 423 } 424 425 @Test 426 public void testSplitPreservesEdits() throws IOException { 427 final String REGION = "region__1"; 428 REGIONS.clear(); 429 REGIONS.add(REGION); 430 431 generateWALs(1, 10, -1, 0); 432 useDifferentDFSClient(); 433 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 434 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 435 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 436 assertEquals(1, splitLog.length); 437 438 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 439 } 440 441 @Test 442 public void testSplitRemovesRegionEventsEdits() throws IOException { 443 final String REGION = "region__1"; 444 REGIONS.clear(); 445 REGIONS.add(REGION); 446 447 generateWALs(1, 10, -1, 100); 448 useDifferentDFSClient(); 449 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 450 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 451 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 452 assertEquals(1, splitLog.length); 453 454 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 455 // split log should only have the test edits 456 assertEquals(10, countWAL(splitLog[0])); 457 } 458 459 @Test 460 public void testSplitLeavesCompactionEventsEdits() throws IOException { 461 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 462 REGIONS.clear(); 463 REGIONS.add(hri.getEncodedName()); 464 Path regionDir = 465 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 466 LOG.info("Creating region directory: " + regionDir); 467 assertTrue(fs.mkdirs(regionDir)); 468 469 Writer writer = generateWALs(1, 10, 0, 10); 470 String[] compactInputs = new String[] { "file1", "file2", "file3" }; 471 String compactOutput = "file4"; 472 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 473 writer.close(); 474 475 useDifferentDFSClient(); 476 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 477 478 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 479 // original log should have 10 test edits, 10 region markers, 1 compaction marker 480 assertEquals(21, countWAL(originalLog)); 481 482 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 483 assertEquals(1, splitLog.length); 484 485 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 486 // split log should have 10 test edits plus 1 compaction marker 487 assertEquals(11, countWAL(splitLog[0])); 488 } 489 490 /** 491 * Tests that WalSplitter ignores replication marker edits. 492 */ 493 @Test 494 public void testSplitRemovesReplicationMarkerEdits() throws IOException { 495 RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO; 496 Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1"); 497 generateReplicationMarkerEdits(path, regionInfo); 498 useDifferentDFSClient(); 499 List<FileStatus> logFiles = 500 SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null); 501 assertEquals(1, logFiles.size()); 502 assertEquals(path, logFiles.get(0).getPath()); 503 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 504 // Make sure that WALSplitter doesn't fail. 505 assertEquals(0, splitPaths.size()); 506 } 507 508 private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException { 509 long timestamp = EnvironmentEdgeManager.currentTime(); 510 fs.mkdirs(WALDIR); 511 try (Writer writer = wals.createWALWriter(fs, path)) { 512 WALProtos.ReplicationMarkerDescriptor.Builder builder = 513 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 514 builder.setWalName("wal-name"); 515 builder.setRegionServerName("rs-name"); 516 builder.setOffset(0L); 517 WALProtos.ReplicationMarkerDescriptor desc = builder.build(); 518 appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(), 519 getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1); 520 } 521 } 522 523 /** 524 * @param expectedEntries -1 to not assert 525 * @return the count across all regions 526 */ 527 private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { 528 useDifferentDFSClient(); 529 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 530 int result = 0; 531 for (String region : REGIONS) { 532 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 533 assertEquals(expectedFiles, logfiles.length); 534 int count = 0; 535 for (Path logfile : logfiles) { 536 count += countWAL(logfile); 537 } 538 if (-1 != expectedEntries) { 539 assertEquals(expectedEntries, count); 540 } 541 result += count; 542 } 543 return result; 544 } 545 546 @Test 547 public void testEmptyLogFiles() throws IOException { 548 testEmptyLogFiles(true); 549 } 550 551 @Test 552 public void testEmptyOpenLogFiles() throws IOException { 553 testEmptyLogFiles(false); 554 } 555 556 private void testEmptyLogFiles(final boolean close) throws IOException { 557 // we won't create the hlog dir until getWAL got called, so 558 // make dir here when testing empty log file 559 fs.mkdirs(WALDIR); 560 injectEmptyFile(".empty", close); 561 generateWALs(Integer.MAX_VALUE); 562 injectEmptyFile("empty", close); 563 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 564 } 565 566 @Test 567 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 568 // generate logs but leave wal.dat.5 open. 569 generateWALs(5); 570 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 571 } 572 573 @Test 574 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 575 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 576 generateWALs(Integer.MAX_VALUE); 577 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true); 578 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 579 } 580 581 @Test 582 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 583 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 584 generateWALs(Integer.MAX_VALUE); 585 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, 586 true); 587 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt 588 } 589 590 @Test 591 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 592 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 593 generateWALs(Integer.MAX_VALUE); 594 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, 595 false); 596 // the entries in the original logs are alternating regions 597 // considering the sequence file header, the middle corruption should 598 // affect at least half of the entries 599 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 600 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 601 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 602 assertTrue("The file up to the corrupted area hasn't been parsed", 603 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 604 } 605 606 @Test 607 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 608 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 609 List<FaultyProtobufWALStreamReader.FailureType> failureTypes = 610 Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream() 611 .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE) 612 .collect(Collectors.toList()); 613 for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) { 614 final Set<String> walDirContents = splitCorruptWALs(failureType); 615 final Set<String> archivedLogs = new HashSet<>(); 616 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 617 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 618 archived.append("\n\t").append(log.toString()); 619 archivedLogs.add(log.getPath().getName()); 620 } 621 LOG.debug(archived.toString()); 622 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 623 walDirContents); 624 } 625 } 626 627 /** 628 * @return set of wal names present prior to split attempt. 629 * @throws IOException if the split process fails 630 */ 631 private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType) 632 throws IOException { 633 String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 634 InstrumentedLogWriter.activateFailure = false; 635 636 try { 637 conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class, 638 WALStreamReader.class); 639 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 640 // Clean up from previous tests or previous loop 641 try { 642 wals.shutdown(); 643 } catch (IOException exception) { 644 // since we're splitting out from under the factory, we should expect some closing failures. 645 LOG.debug("Ignoring problem closing WALFactory.", exception); 646 } 647 wals.close(); 648 try { 649 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 650 fs.delete(log.getPath(), true); 651 } 652 } catch (FileNotFoundException exception) { 653 LOG.debug("no previous CORRUPTDIR to clean."); 654 } 655 // change to the faulty reader 656 wals = new WALFactory(conf, name.getMethodName()); 657 generateWALs(-1); 658 // Our reader will render all of these files corrupt. 659 final Set<String> walDirContents = new HashSet<>(); 660 for (FileStatus status : fs.listStatus(WALDIR)) { 661 walDirContents.add(status.getPath().getName()); 662 } 663 useDifferentDFSClient(); 664 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 665 return walDirContents; 666 } finally { 667 if (backupClass != null) { 668 conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass); 669 } else { 670 conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 671 } 672 } 673 } 674 675 @Test(expected = IOException.class) 676 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { 677 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 678 splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING); 679 } 680 681 @Test 682 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { 683 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 684 try { 685 splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING); 686 } catch (IOException e) { 687 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 688 } 689 assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS, 690 fs.listStatus(WALDIR).length); 691 } 692 693 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 694 final int expectedCount) throws IOException { 695 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 696 697 final String REGION = "region__1"; 698 REGIONS.clear(); 699 REGIONS.add(REGION); 700 701 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 702 generateWALs(1, entryCount, -1, 0); 703 corruptWAL(c1, corruption, true); 704 705 useDifferentDFSClient(); 706 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 707 708 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 709 assertEquals(1, splitLog.length); 710 711 int actualCount = 0; 712 try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) { 713 while (in.next() != null) { 714 ++actualCount; 715 } 716 } 717 assertEquals(expectedCount, actualCount); 718 719 // should not have stored the EOF files as corrupt 720 FileStatus[] archivedLogs = 721 fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0]; 722 assertEquals(0, archivedLogs.length); 723 724 } 725 726 @Test 727 public void testEOFisIgnored() throws IOException { 728 int entryCount = 10; 729 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1); 730 } 731 732 @Test 733 public void testCorruptWALTrailer() throws IOException { 734 int entryCount = 10; 735 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 736 } 737 738 @Test 739 public void testLogsGetArchivedAfterSplit() throws IOException { 740 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 741 generateWALs(-1); 742 useDifferentDFSClient(); 743 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 744 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 745 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 746 } 747 748 @Test 749 public void testSplit() throws IOException { 750 generateWALs(-1); 751 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 752 } 753 754 @Test 755 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { 756 generateWALs(-1); 757 useDifferentDFSClient(); 758 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 759 FileStatus[] statuses = null; 760 try { 761 statuses = fs.listStatus(WALDIR); 762 if (statuses != null) { 763 fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 764 } 765 } catch (FileNotFoundException e) { 766 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 767 } 768 } 769 770 @Test(expected = IOException.class) 771 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 772 // leave 5th log open so we could append the "trap" 773 Writer writer = generateWALs(4); 774 useDifferentDFSClient(); 775 776 String region = "break"; 777 Path regiondir = new Path(TABLEDIR, region); 778 fs.mkdirs(regiondir); 779 780 InstrumentedLogWriter.activateFailure = false; 781 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY, 782 QUALIFIER, VALUE, 0); 783 writer.close(); 784 785 try { 786 InstrumentedLogWriter.activateFailure = true; 787 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 788 } catch (IOException e) { 789 assertTrue(e.getMessage() 790 .contains("This exception is instrumented and should only be thrown for testing")); 791 throw e; 792 } finally { 793 InstrumentedLogWriter.activateFailure = false; 794 } 795 } 796 797 @Test 798 public void testSplitDeletedRegion() throws IOException { 799 REGIONS.clear(); 800 String region = "region_that_splits"; 801 REGIONS.add(region); 802 803 generateWALs(1); 804 useDifferentDFSClient(); 805 806 Path regiondir = new Path(TABLEDIR, region); 807 fs.delete(regiondir, true); 808 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 809 assertFalse(fs.exists(regiondir)); 810 } 811 812 @Test 813 public void testIOEOnOutputThread() throws Exception { 814 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 815 816 generateWALs(-1); 817 useDifferentDFSClient(); 818 FileStatus[] logfiles = fs.listStatus(WALDIR); 819 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 820 // wals with no entries (like the one we don't use in the factory) 821 // won't cause a failure since nothing will ever be written. 822 // pick the largest one since it's most likely to have entries. 823 int largestLogFile = 0; 824 long largestSize = 0; 825 for (int i = 0; i < logfiles.length; i++) { 826 if (logfiles[i].getLen() > largestSize) { 827 largestLogFile = i; 828 largestSize = logfiles[i].getLen(); 829 } 830 } 831 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 832 // Set up a splitter that will throw an IOE on the output side 833 WALSplitter logSplitter = 834 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 835 @Override 836 protected Writer createWriter(Path logfile) throws IOException { 837 Writer mockWriter = Mockito.mock(Writer.class); 838 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 839 .append(Mockito.<Entry> any()); 840 return mockWriter; 841 } 842 }; 843 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 844 // the thread dumping in a background thread so it does not hold up the test. 845 final AtomicBoolean stop = new AtomicBoolean(false); 846 final Thread someOldThread = new Thread("Some-old-thread") { 847 @Override 848 public void run() { 849 while (!stop.get()) 850 Threads.sleep(10); 851 } 852 }; 853 someOldThread.setDaemon(true); 854 someOldThread.start(); 855 final Thread t = new Thread("Background-thread-dumper") { 856 @Override 857 public void run() { 858 try { 859 Threads.threadDumpingIsAlive(someOldThread); 860 } catch (InterruptedException e) { 861 e.printStackTrace(); 862 } 863 } 864 }; 865 t.setDaemon(true); 866 t.start(); 867 try { 868 logSplitter.splitWAL(logfiles[largestLogFile], null); 869 fail("Didn't throw!"); 870 } catch (IOException ioe) { 871 assertTrue(ioe.toString().contains("Injected")); 872 } finally { 873 // Setting this to true will turn off the background thread dumper. 874 stop.set(true); 875 } 876 } 877 878 /** 879 * @param spiedFs should be instrumented for failure. 880 */ 881 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 882 generateWALs(-1); 883 useDifferentDFSClient(); 884 885 try { 886 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 887 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 888 assertFalse(fs.exists(WALDIR)); 889 } catch (IOException e) { 890 fail("There shouldn't be any exception but: " + e.toString()); 891 } 892 } 893 894 // Test for HBASE-3412 895 @Test 896 public void testMovedWALDuringRecovery() throws Exception { 897 // This partial mock will throw LEE for every file simulating 898 // files that were moved 899 FileSystem spiedFs = Mockito.spy(fs); 900 // The "File does not exist" part is very important, 901 // that's how it comes out of HDFS 902 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs) 903 .append(Mockito.<Path> any()); 904 retryOverHdfsProblem(spiedFs); 905 } 906 907 @Test 908 public void testRetryOpenDuringRecovery() throws Exception { 909 FileSystem spiedFs = Mockito.spy(fs); 910 // The "Cannot obtain block length", "Could not obtain the last block", 911 // and "Blocklist for [^ ]* has changed.*" part is very important, 912 // that's how it comes out of HDFS. If HDFS changes the exception 913 // message, this test needs to be adjusted accordingly. 914 // 915 // When DFSClient tries to open a file, HDFS needs to locate 916 // the last block of the file and get its length. However, if the 917 // last block is under recovery, HDFS may have problem to obtain 918 // the block length, in which case, retry may help. 919 Mockito.doAnswer(new Answer<FSDataInputStream>() { 920 private final String[] errors = new String[] { "Cannot obtain block length", 921 "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" }; 922 private int count = 0; 923 924 @Override 925 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 926 if (count < 3) { 927 throw new IOException(errors[count++]); 928 } 929 return (FSDataInputStream) invocation.callRealMethod(); 930 } 931 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 932 retryOverHdfsProblem(spiedFs); 933 } 934 935 @Test 936 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 937 generateWALs(1, 10, -1); 938 FileStatus logfile = fs.listStatus(WALDIR)[0]; 939 useDifferentDFSClient(); 940 941 final AtomicInteger count = new AtomicInteger(); 942 943 CancelableProgressable localReporter = new CancelableProgressable() { 944 @Override 945 public boolean progress() { 946 count.getAndIncrement(); 947 return false; 948 } 949 }; 950 951 FileSystem spiedFs = Mockito.spy(fs); 952 Mockito.doAnswer(new Answer<FSDataInputStream>() { 953 @Override 954 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 955 Thread.sleep(1500); // Sleep a while and wait report status invoked 956 return (FSDataInputStream) invocation.callRealMethod(); 957 } 958 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 959 960 try { 961 conf.setInt("hbase.splitlog.report.period", 1000); 962 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 963 Mockito.mock(SplitLogWorkerCoordination.class), wals, null); 964 assertFalse("Log splitting should failed", ret); 965 assertTrue(count.get() > 0); 966 } catch (IOException e) { 967 fail("There shouldn't be any exception but: " + e.toString()); 968 } finally { 969 // reset it back to its default value 970 conf.setInt("hbase.splitlog.report.period", 59000); 971 } 972 } 973 974 /** 975 * Test log split process with fake data and lots of edits to trigger threading issues. 976 */ 977 @Test 978 public void testThreading() throws Exception { 979 doTestThreading(20000, 128 * 1024 * 1024, 0); 980 } 981 982 /** 983 * Test blocking behavior of the log split process if writers are writing slower than the reader 984 * is reading. 985 */ 986 @Test 987 public void testThreadingSlowWriterSmallBuffer() throws Exception { 988 // The logic of this test has conflict with the limit writers split logic, skip this test for 989 // TestWALSplitBoundedLogWriterCreation 990 assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation); 991 doTestThreading(200, 1024, 50); 992 } 993 994 /** 995 * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified 996 * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is 997 * fed. After the split is complete, verifies that the statistics show the correct number of edits 998 * output into each region. 999 * @param numFakeEdits number of fake edits to push through pipeline 1000 * @param bufferSize size of in-memory buffer 1001 * @param writerSlowness writer threads will sleep this many ms per edit 1002 */ 1003 private void doTestThreading(final int numFakeEdits, final int bufferSize, 1004 final int writerSlowness) throws Exception { 1005 1006 Configuration localConf = new Configuration(conf); 1007 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 1008 1009 // Create a fake log file (we'll override the reader to produce a stream of edits) 1010 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 1011 FSDataOutputStream out = fs.create(logPath); 1012 out.close(); 1013 1014 // Make region dirs for our destination regions so the output doesn't get skipped 1015 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1016 makeRegionDirs(regions); 1017 1018 // Create a splitter that reads and writes the data without touching disk 1019 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { 1020 /* Produce a mock writer that doesn't write anywhere */ 1021 @Override 1022 protected Writer createWriter(Path logfile) throws IOException { 1023 Writer mockWriter = Mockito.mock(Writer.class); 1024 Mockito.doAnswer(new Answer<Void>() { 1025 int expectedIndex = 0; 1026 1027 @Override 1028 public Void answer(InvocationOnMock invocation) { 1029 if (writerSlowness > 0) { 1030 try { 1031 Thread.sleep(writerSlowness); 1032 } catch (InterruptedException ie) { 1033 Thread.currentThread().interrupt(); 1034 } 1035 } 1036 Entry entry = (Entry) invocation.getArgument(0); 1037 WALEdit edit = entry.getEdit(); 1038 List<Cell> cells = edit.getCells(); 1039 assertEquals(1, cells.size()); 1040 Cell cell = cells.get(0); 1041 1042 // Check that the edits come in the right order. 1043 assertEquals(expectedIndex, 1044 Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 1045 expectedIndex++; 1046 return null; 1047 } 1048 }).when(mockWriter).append(Mockito.<Entry> any()); 1049 return mockWriter; 1050 } 1051 1052 /* Produce a mock reader that generates fake entries */ 1053 @Override 1054 protected WALStreamReader getReader(FileStatus file, boolean skipErrors, 1055 CancelableProgressable reporter) throws IOException, CorruptedLogFileException { 1056 WALStreamReader mockReader = Mockito.mock(WALStreamReader.class); 1057 Mockito.doAnswer(new Answer<Entry>() { 1058 int index = 0; 1059 1060 @Override 1061 public Entry answer(InvocationOnMock invocation) throws Throwable { 1062 if (index >= numFakeEdits) { 1063 return null; 1064 } 1065 1066 // Generate r0 through r4 in round robin fashion 1067 int regionIdx = index % regions.size(); 1068 byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) }; 1069 1070 Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()), 1071 FAMILY, QUALIFIER, VALUE, index); 1072 index++; 1073 return ret; 1074 } 1075 }).when(mockReader).next(); 1076 return mockReader; 1077 } 1078 }; 1079 1080 logSplitter.splitWAL(fs.getFileStatus(logPath), null); 1081 1082 // Verify number of written edits per region 1083 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1084 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1085 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1086 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1087 } 1088 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1089 } 1090 1091 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1092 @Test 1093 public void testSplitLogFileDeletedRegionDir() throws IOException { 1094 LOG.info("testSplitLogFileDeletedRegionDir"); 1095 final String REGION = "region__1"; 1096 REGIONS.clear(); 1097 REGIONS.add(REGION); 1098 1099 generateWALs(1, 10, -1); 1100 useDifferentDFSClient(); 1101 1102 Path regiondir = new Path(TABLEDIR, REGION); 1103 LOG.info("Region directory is" + regiondir); 1104 fs.delete(regiondir, true); 1105 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1106 assertFalse(fs.exists(regiondir)); 1107 } 1108 1109 @Test 1110 public void testSplitLogFileEmpty() throws IOException { 1111 LOG.info("testSplitLogFileEmpty"); 1112 // we won't create the hlog dir until getWAL got called, so 1113 // make dir here when testing empty log file 1114 fs.mkdirs(WALDIR); 1115 injectEmptyFile(".empty", true); 1116 useDifferentDFSClient(); 1117 1118 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1119 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1120 assertFalse(fs.exists(tdir)); 1121 1122 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1123 } 1124 1125 @Test 1126 public void testSplitLogFileMultipleRegions() throws IOException { 1127 LOG.info("testSplitLogFileMultipleRegions"); 1128 generateWALs(1, 10, -1); 1129 splitAndCount(1, 10); 1130 } 1131 1132 @Test 1133 public void testSplitLogFileFirstLineCorruptionLog() throws IOException { 1134 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 1135 generateWALs(1, 10, -1); 1136 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1137 1138 corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1139 1140 useDifferentDFSClient(); 1141 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1142 1143 final Path corruptDir = 1144 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1145 assertEquals(1, fs.listStatus(corruptDir).length); 1146 } 1147 1148 /** 1149 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1150 */ 1151 @Test 1152 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1153 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1154 // Generate wals for our destination region 1155 String regionName = "r0"; 1156 final Path regiondir = new Path(TABLEDIR, regionName); 1157 REGIONS.clear(); 1158 REGIONS.add(regionName); 1159 generateWALs(-1); 1160 1161 wals.getWAL(null); 1162 FileStatus[] logfiles = fs.listStatus(WALDIR); 1163 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 1164 1165 WALSplitter logSplitter = 1166 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1167 @Override 1168 protected Writer createWriter(Path logfile) throws IOException { 1169 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1170 // After creating writer, simulate region's 1171 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1172 // region and delete them, excluding files with '.temp' suffix. 1173 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1174 if (files != null && !files.isEmpty()) { 1175 for (Path file : files) { 1176 if (!this.walFS.delete(file, false)) { 1177 LOG.error("Failed delete of " + file); 1178 } else { 1179 LOG.debug("Deleted recovered.edits file=" + file); 1180 } 1181 } 1182 } 1183 return writer; 1184 } 1185 }; 1186 try { 1187 logSplitter.splitWAL(logfiles[0], null); 1188 } catch (IOException e) { 1189 LOG.info(e.toString(), e); 1190 fail("Throws IOException when spliting " 1191 + "log, it is most likely because writing file does not " 1192 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1193 } 1194 if (fs.exists(CORRUPTDIR)) { 1195 if (fs.listStatus(CORRUPTDIR).length > 0) { 1196 fail("There are some corrupt logs, " 1197 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1198 } 1199 } 1200 } 1201 1202 @Test 1203 public void testRecoveredEditsStoragePolicy() throws IOException { 1204 conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD"); 1205 try { 1206 Path path = createRecoveredEditsPathForRegion(); 1207 assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName()); 1208 } finally { 1209 conf.unset(HConstants.WAL_STORAGE_POLICY); 1210 } 1211 } 1212 1213 /** 1214 * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process 1215 * it, instead of losing data after it. 1216 */ 1217 @Test 1218 public void testEmptyWALEdit() throws IOException { 1219 final String region = "region__5"; 1220 REGIONS.clear(); 1221 REGIONS.add(region); 1222 makeRegionDirs(REGIONS); 1223 fs.mkdirs(WALDIR); 1224 Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5); 1225 generateEmptyEditWAL(path, Bytes.toBytes(region)); 1226 useDifferentDFSClient(); 1227 1228 Path regiondir = new Path(TABLEDIR, region); 1229 fs.mkdirs(regiondir); 1230 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1231 // Make sure that WALSplitter generate the split file 1232 assertEquals(1, splitPaths.size()); 1233 1234 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 1235 assertEquals(11, countWAL(originalLog)); 1236 // we will skip the empty WAL when splitting 1237 assertEquals(10, countWAL(splitPaths.get(0))); 1238 } 1239 1240 private void generateEmptyEditWAL(Path path, byte[] region) throws IOException { 1241 fs.mkdirs(WALDIR); 1242 try (Writer writer = wals.createWALWriter(fs, path)) { 1243 long seq = 0; 1244 appendEmptyEntry(writer, TABLE_NAME, region, seq++); 1245 for (int i = 0; i < 10; i++) { 1246 appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++); 1247 } 1248 } 1249 } 1250 1251 private Writer generateWALs(int leaveOpen) throws IOException { 1252 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1253 } 1254 1255 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1256 return generateWALs(writers, entries, leaveOpen, 7); 1257 } 1258 1259 private void makeRegionDirs(List<String> regions) throws IOException { 1260 for (String region : regions) { 1261 LOG.debug("Creating dir for region " + region); 1262 fs.mkdirs(new Path(TABLEDIR, region)); 1263 } 1264 } 1265 1266 /** 1267 * @param leaveOpen index to leave un-closed. -1 to close all. 1268 * @return the writer that's still open, or null if all were closed. 1269 */ 1270 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) 1271 throws IOException { 1272 makeRegionDirs(REGIONS); 1273 fs.mkdirs(WALDIR); 1274 Writer[] ws = new Writer[writers]; 1275 int seq = 0; 1276 int numRegionEventsAdded = 0; 1277 for (int i = 0; i < writers; i++) { 1278 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1279 for (int j = 0; j < entries; j++) { 1280 int prefix = 0; 1281 for (String region : REGIONS) { 1282 String row_key = region + prefix++ + i + j; 1283 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1284 QUALIFIER, VALUE, seq++); 1285 1286 if (numRegionEventsAdded < regionEvents) { 1287 numRegionEventsAdded++; 1288 appendRegionEvent(ws[i], region); 1289 } 1290 } 1291 } 1292 if (i != leaveOpen) { 1293 ws[i].close(); 1294 LOG.info("Closing writer " + i); 1295 } 1296 } 1297 if (leaveOpen < 0 || leaveOpen >= writers) { 1298 return null; 1299 } 1300 return ws[leaveOpen]; 1301 } 1302 1303 private Path[] getLogForRegion(TableName table, String region) throws IOException { 1304 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1305 @SuppressWarnings("deprecation") 1306 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir( 1307 HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region)))); 1308 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1309 @Override 1310 public boolean accept(Path p) { 1311 if (WALSplitUtil.isSequenceIdFile(p)) { 1312 return false; 1313 } 1314 return true; 1315 } 1316 }); 1317 Path[] paths = new Path[files.length]; 1318 for (int i = 0; i < files.length; i++) { 1319 paths[i] = files[i].getPath(); 1320 } 1321 return paths; 1322 } 1323 1324 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1325 FSDataOutputStream out; 1326 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1327 1328 FSDataInputStream in = fs.open(path); 1329 byte[] corrupted_bytes = new byte[fileSize]; 1330 in.readFully(0, corrupted_bytes, 0, fileSize); 1331 in.close(); 1332 1333 switch (corruption) { 1334 case APPEND_GARBAGE: 1335 fs.delete(path, false); 1336 out = fs.create(path); 1337 out.write(corrupted_bytes); 1338 out.write(Bytes.toBytes("-----")); 1339 closeOrFlush(close, out); 1340 break; 1341 1342 case INSERT_GARBAGE_ON_FIRST_LINE: 1343 fs.delete(path, false); 1344 out = fs.create(path); 1345 out.write(0); 1346 out.write(corrupted_bytes); 1347 closeOrFlush(close, out); 1348 break; 1349 1350 case INSERT_GARBAGE_IN_THE_MIDDLE: 1351 fs.delete(path, false); 1352 out = fs.create(path); 1353 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1354 out.write(corrupted_bytes, 0, middle); 1355 out.write(0); 1356 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1357 closeOrFlush(close, out); 1358 break; 1359 1360 case TRUNCATE: 1361 fs.delete(path, false); 1362 out = fs.create(path); 1363 out.write(corrupted_bytes, 0, fileSize 1364 - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1365 closeOrFlush(close, out); 1366 break; 1367 1368 case TRUNCATE_TRAILER: 1369 fs.delete(path, false); 1370 out = fs.create(path); 1371 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1372 closeOrFlush(close, out); 1373 break; 1374 } 1375 } 1376 1377 private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException { 1378 if (close) { 1379 out.close(); 1380 } else { 1381 Method syncMethod = null; 1382 try { 1383 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 1384 } catch (NoSuchMethodException e) { 1385 try { 1386 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 1387 } catch (NoSuchMethodException ex) { 1388 throw new IOException( 1389 "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush()."); 1390 } 1391 } 1392 try { 1393 syncMethod.invoke(out, new Object[] {}); 1394 } catch (Exception e) { 1395 throw new IOException(e); 1396 } 1397 // Not in 0out.hflush(); 1398 } 1399 } 1400 1401 private int countWAL(Path log) throws IOException { 1402 int count = 0; 1403 try (WALStreamReader in = wals.createStreamReader(fs, log)) { 1404 while (in.next() != null) { 1405 count++; 1406 } 1407 } 1408 return count; 1409 } 1410 1411 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1412 String output) throws IOException { 1413 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1414 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1415 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1416 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1417 .setFamilyName(ByteString.copyFrom(FAMILY)) 1418 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1419 .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output); 1420 1421 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1422 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1423 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1424 w.append(new Entry(key, edit)); 1425 w.sync(false); 1426 } 1427 1428 private static void appendRegionEvent(Writer w, String region) throws IOException { 1429 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1430 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(), 1431 Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1, 1432 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of()); 1433 final long time = EnvironmentEdgeManager.currentTime(); 1434 final WALKeyImpl walKey = 1435 new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); 1436 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1437 w.append(new Entry(walKey, we)); 1438 w.sync(false); 1439 } 1440 1441 private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, 1442 byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { 1443 LOG.info(Thread.currentThread().getName() + " append"); 1444 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1445 LOG.info(Thread.currentThread().getName() + " sync"); 1446 writer.sync(false); 1447 return seq; 1448 } 1449 1450 private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family, 1451 byte[] qualifier, byte[] value, long seq) { 1452 long time = System.nanoTime(); 1453 1454 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1455 WALEdit edit = new WALEdit(); 1456 edit.add(cell); 1457 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); 1458 } 1459 1460 private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq) 1461 throws IOException { 1462 LOG.info(Thread.currentThread().getName() + " append"); 1463 writer.append(createEmptyEntry(table, region, seq)); 1464 LOG.info(Thread.currentThread().getName() + " sync"); 1465 writer.sync(false); 1466 return seq; 1467 } 1468 1469 private static Entry createEmptyEntry(TableName table, byte[] region, long seq) { 1470 long time = System.nanoTime(); 1471 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), 1472 new WALEdit()); 1473 } 1474 1475 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1476 Writer writer = 1477 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1478 if (closeFile) { 1479 writer.close(); 1480 } 1481 } 1482 1483 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1484 try (WALStreamReader in1 = wals.createStreamReader(fs, p1); 1485 WALStreamReader in2 = wals.createStreamReader(fs, p2)) { 1486 Entry entry1; 1487 Entry entry2; 1488 while ((entry1 = in1.next()) != null) { 1489 entry2 = in2.next(); 1490 if ( 1491 (entry1.getKey().compareTo(entry2.getKey()) != 0) 1492 || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) 1493 ) { 1494 return false; 1495 } 1496 } 1497 } 1498 return true; 1499 } 1500}