001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; 021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.ExtendedCell; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ClientInternalHelper; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.TableDescriptor; 059import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 060import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 061import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 062import org.apache.hadoop.hbase.regionserver.HRegion; 063import org.apache.hadoop.hbase.regionserver.RegionScanner; 064import org.apache.hadoop.hbase.regionserver.RegionServerServices; 065import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; 066import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 067import org.apache.hadoop.hbase.security.User; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.apache.hadoop.hbase.testclassification.RegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CommonFSUtils; 072import org.apache.hadoop.hbase.util.EnvironmentEdge; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.FSTableDescriptors; 075import org.apache.hadoop.hbase.util.Pair; 076import org.junit.After; 077import org.junit.AfterClass; 078import org.junit.Before; 079import org.junit.BeforeClass; 080import org.junit.ClassRule; 081import org.junit.Rule; 082import org.junit.Test; 083import org.junit.experimental.categories.Category; 084import org.junit.rules.TestName; 085import org.mockito.Mockito; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089@Category({ RegionServerTests.class, MediumTests.class }) 090public class TestWALSplitToHFile { 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestWALSplitToHFile.class); 094 095 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 096 static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 097 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 098 private Path rootDir = null; 099 private String logName; 100 private Path oldLogDir; 101 private Path logDir; 102 private FileSystem fs; 103 private Configuration conf; 104 private WALFactory wals; 105 106 private static final byte[] ROW = Bytes.toBytes("row"); 107 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 108 private static final byte[] VALUE1 = Bytes.toBytes("value1"); 109 private static final byte[] VALUE2 = Bytes.toBytes("value2"); 110 private static final int countPerFamily = 10; 111 112 @Rule 113 public final TestName TEST_NAME = new TestName(); 114 115 @BeforeClass 116 public static void setUpBeforeClass() throws Exception { 117 Configuration conf = UTIL.getConfiguration(); 118 conf.setBoolean(WAL_SPLIT_TO_HFILE, true); 119 UTIL.startMiniCluster(3); 120 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 121 LOG.info("hbase.rootdir=" + hbaseRootDir); 122 CommonFSUtils.setRootDir(conf, hbaseRootDir); 123 } 124 125 @AfterClass 126 public static void tearDownAfterClass() throws Exception { 127 UTIL.shutdownMiniCluster(); 128 } 129 130 @Before 131 public void setUp() throws Exception { 132 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 133 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); 134 this.fs = UTIL.getDFSCluster().getFileSystem(); 135 this.rootDir = CommonFSUtils.getRootDir(this.conf); 136 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 137 String serverName = ServerName 138 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 139 .toString(); 140 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 141 this.logDir = new Path(this.rootDir, logName); 142 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { 143 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 144 } 145 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 146 } 147 148 @After 149 public void tearDown() throws Exception { 150 this.wals.close(); 151 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 152 } 153 154 /* 155 * @param p Directory to cleanup 156 */ 157 private void deleteDir(final Path p) throws IOException { 158 if (this.fs.exists(p)) { 159 if (!this.fs.delete(p, true)) { 160 throw new IOException("Failed remove of " + p); 161 } 162 } 163 } 164 165 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { 166 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 167 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); 168 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); 169 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); 170 TableDescriptor td = builder.build(); 171 UTIL.getAdmin().createTable(td); 172 return td; 173 } 174 175 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { 176 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); 177 wal.init(); 178 return wal; 179 } 180 181 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException { 182 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf); 183 wal.init(); 184 return wal; 185 } 186 187 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException { 188 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 189 final TableDescriptor td = createBasic3FamilyTD(tableName); 190 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); 191 final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName); 192 deleteDir(tableDir); 193 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); 194 HRegion region = HBaseTestingUtil.createRegionAndWAL(ri, rootDir, this.conf, td); 195 HBaseTestingUtil.closeRegionAndWAL(region); 196 return new Pair<>(td, ri); 197 } 198 199 private void writeData(TableDescriptor td, HRegion region) throws IOException { 200 final long timestamp = this.ee.currentTime(); 201 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 202 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 203 } 204 } 205 206 @Test 207 public void testDifferentRootDirAndWALRootDir() throws Exception { 208 // Change wal root dir and reset the configuration 209 Path walRootDir = UTIL.createWALRootDir(); 210 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 211 212 FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf); 213 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 214 String serverName = ServerName 215 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 216 .toString(); 217 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 218 this.logDir = new Path(walRootDir, logName); 219 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 220 221 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 222 TableDescriptor td = pair.getFirst(); 223 RegionInfo ri = pair.getSecond(); 224 225 WAL wal = createWAL(walFs, walRootDir, logName); 226 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 227 writeData(td, region); 228 229 // Now close the region without flush 230 region.close(true); 231 wal.shutdown(); 232 // split the log 233 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 234 235 WAL wal2 = createWAL(walFs, walRootDir, logName); 236 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 237 Result result2 = region2.get(new Get(ROW)); 238 assertEquals(td.getColumnFamilies().length, result2.size()); 239 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 240 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 241 } 242 } 243 244 @Test 245 public void testCorruptRecoveredHFile() throws Exception { 246 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 247 TableDescriptor td = pair.getFirst(); 248 RegionInfo ri = pair.getSecond(); 249 250 WAL wal = createWAL(this.conf, rootDir, logName); 251 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 252 writeData(td, region); 253 254 // Now close the region without flush 255 region.close(true); 256 wal.shutdown(); 257 // split the log 258 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 259 260 // Write a corrupt recovered hfile 261 Path regionDir = 262 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); 263 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 264 FileStatus[] files = 265 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 266 assertNotNull(files); 267 assertTrue(files.length > 0); 268 writeCorruptRecoveredHFile(files[0].getPath()); 269 } 270 271 // Failed to reopen the region 272 WAL wal2 = createWAL(this.conf, rootDir, logName); 273 try { 274 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 275 fail("Should fail to open region"); 276 } catch (CorruptHFileException che) { 277 // Expected 278 } 279 280 // Set skip errors to true and reopen the region 281 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); 282 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 283 Result result2 = region2.get(new Get(ROW)); 284 assertEquals(td.getColumnFamilies().length, result2.size()); 285 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 286 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 287 // Assert the corrupt file was skipped and still exist 288 FileStatus[] files = 289 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 290 assertNotNull(files); 291 assertEquals(1, files.length); 292 assertTrue(files[0].getPath().getName().contains("corrupt")); 293 } 294 } 295 296 @Test 297 public void testPutWithSameTimestamp() throws Exception { 298 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 299 TableDescriptor td = pair.getFirst(); 300 RegionInfo ri = pair.getSecond(); 301 302 WAL wal = createWAL(this.conf, rootDir, logName); 303 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 304 final long timestamp = this.ee.currentTime(); 305 // Write data and flush 306 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 307 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 308 } 309 region.flush(true); 310 311 // Write data with same timestamp and do not flush 312 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 313 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2)); 314 } 315 // Now close the region without flush 316 region.close(true); 317 wal.shutdown(); 318 // split the log 319 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 320 321 // reopen the region 322 WAL wal2 = createWAL(this.conf, rootDir, logName); 323 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 324 Result result2 = region2.get(new Get(ROW)); 325 assertEquals(td.getColumnFamilies().length, result2.size()); 326 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 327 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER))); 328 } 329 } 330 331 @Test 332 public void testRecoverSequenceId() throws Exception { 333 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 334 TableDescriptor td = pair.getFirst(); 335 RegionInfo ri = pair.getSecond(); 336 337 WAL wal = createWAL(this.conf, rootDir, logName); 338 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 339 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>(); 340 // Write data and do not flush 341 for (int i = 0; i < countPerFamily; i++) { 342 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 343 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1)); 344 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 345 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 346 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 347 assertEquals(1, cells.length); 348 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), 349 cells[0].getSequenceId()); 350 } 351 } 352 353 // Now close the region without flush 354 region.close(true); 355 wal.shutdown(); 356 // split the log 357 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 358 359 // reopen the region 360 WAL wal2 = createWAL(this.conf, rootDir, logName); 361 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 362 // assert the seqid was recovered 363 for (int i = 0; i < countPerFamily; i++) { 364 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 365 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 366 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 367 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 368 assertEquals(1, cells.length); 369 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), cells[0].getSequenceId()); 370 } 371 } 372 } 373 374 /** 375 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 376 * seqids. 377 */ 378 @Test 379 public void testWrittenViaHRegion() 380 throws IOException, SecurityException, IllegalArgumentException, InterruptedException { 381 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 382 TableDescriptor td = pair.getFirst(); 383 RegionInfo ri = pair.getSecond(); 384 385 // Write countPerFamily edits into the three families. Do a flush on one 386 // of the families during the load of edits so its seqid is not same as 387 // others to test we do right thing when different seqids. 388 WAL wal = createWAL(this.conf, rootDir, logName); 389 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 390 long seqid = region.getOpenSeqNum(); 391 boolean first = true; 392 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 393 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 394 if (first) { 395 // If first, so we have at least one family w/ different seqid to rest. 396 region.flush(true); 397 first = false; 398 } 399 } 400 // Now assert edits made it in. 401 final Get g = new Get(ROW); 402 Result result = region.get(g); 403 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 404 // Now close the region (without flush), split the log, reopen the region and assert that 405 // replay of log has the correct effect, that our seqids are calculated correctly so 406 // all edits in logs are seen as 'stale'/old. 407 region.close(true); 408 wal.shutdown(); 409 try { 410 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 411 } catch (Exception e) { 412 LOG.debug("Got exception", e); 413 } 414 415 WAL wal2 = createWAL(this.conf, rootDir, logName); 416 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 417 long seqid2 = region2.getOpenSeqNum(); 418 assertTrue(seqid + result.size() < seqid2); 419 final Result result1b = region2.get(g); 420 assertEquals(result.size(), result1b.size()); 421 422 // Next test. Add more edits, then 'crash' this region by stealing its wal 423 // out from under it and assert that replay of the log adds the edits back 424 // correctly when region is opened again. 425 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { 426 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); 427 } 428 // Get count of edits. 429 final Result result2 = region2.get(g); 430 assertEquals(2 * result.size(), result2.size()); 431 wal2.sync(); 432 final Configuration newConf = HBaseConfiguration.create(this.conf); 433 User user = HBaseTestingUtil.getDifferentUser(newConf, td.getTableName().getNameAsString()); 434 user.runAs(new PrivilegedExceptionAction<Object>() { 435 @Override 436 public Object run() throws Exception { 437 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); 438 FileSystem newFS = FileSystem.get(newConf); 439 // Make a new wal for new region open. 440 WAL wal3 = createWAL(newConf, rootDir, logName); 441 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 442 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); 443 long seqid3 = region3.initialize(); 444 Result result3 = region3.get(g); 445 // Assert that count of cells is same as before crash. 446 assertEquals(result2.size(), result3.size()); 447 448 // I can't close wal1. Its been appropriated when we split. 449 region3.close(); 450 wal3.close(); 451 return null; 452 } 453 }); 454 } 455 456 /** 457 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 458 * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level. 459 * The way we get around this is by flushing at the region level, and then deleting the recently 460 * flushed store file for one of the Stores. This would put us back in the situation where all but 461 * that store got flushed and the region died. We restart Region again, and verify that the edits 462 * were replayed. 463 */ 464 @Test 465 public void testAfterPartialFlush() 466 throws IOException, SecurityException, IllegalArgumentException { 467 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 468 TableDescriptor td = pair.getFirst(); 469 RegionInfo ri = pair.getSecond(); 470 471 // Write countPerFamily edits into the three families. Do a flush on one 472 // of the families during the load of edits so its seqid is not same as 473 // others to test we do right thing when different seqids. 474 WAL wal = createWAL(this.conf, rootDir, logName); 475 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 476 long seqid = region.getOpenSeqNum(); 477 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 478 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 479 } 480 481 // Now assert edits made it in. 482 final Get g = new Get(ROW); 483 Result result = region.get(g); 484 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 485 486 // Let us flush the region 487 region.flush(true); 488 region.close(true); 489 wal.shutdown(); 490 491 // delete the store files in the second column family to simulate a failure 492 // in between the flushcache(); 493 // we have 3 families. killing the middle one ensures that taking the maximum 494 // will make us fail. 495 int cf_count = 0; 496 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 497 cf_count++; 498 if (cf_count == 2) { 499 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); 500 } 501 } 502 503 // Let us try to split and recover 504 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 505 WAL wal2 = createWAL(this.conf, rootDir, logName); 506 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 507 long seqid2 = region2.getOpenSeqNum(); 508 assertTrue(seqid + result.size() < seqid2); 509 510 final Result result1b = region2.get(g); 511 assertEquals(result.size(), result1b.size()); 512 } 513 514 /** 515 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 516 * flush after writing some data, then writing more data and flush again, at last verify the data. 517 */ 518 @Test 519 public void testAfterAbortingFlush() throws IOException { 520 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 521 TableDescriptor td = pair.getFirst(); 522 RegionInfo ri = pair.getSecond(); 523 524 // Write countPerFamily edits into the three families. Do a flush on one 525 // of the families during the load of edits so its seqid is not same as 526 // others to test we do right thing when different seqids. 527 WAL wal = createWAL(this.conf, rootDir, logName); 528 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 529 Mockito.doReturn(false).when(rsServices).isAborted(); 530 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 531 when(rsServices.getConfiguration()).thenReturn(conf); 532 Configuration customConf = new Configuration(this.conf); 533 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 534 AbstractTestWALReplay.CustomStoreFlusher.class.getName()); 535 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); 536 int writtenRowCount = 10; 537 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies()); 538 for (int i = 0; i < writtenRowCount; i++) { 539 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 540 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 541 Bytes.toBytes("val")); 542 region.put(put); 543 } 544 545 // Now assert edits made it in. 546 RegionScanner scanner = region.getScanner(new Scan()); 547 assertEquals(writtenRowCount, getScannedCount(scanner)); 548 549 // Let us flush the region 550 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 551 try { 552 region.flush(true); 553 fail("Injected exception hasn't been thrown"); 554 } catch (IOException e) { 555 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 556 // simulated to abort server 557 Mockito.doReturn(true).when(rsServices).isAborted(); 558 region.setClosing(false); // region normally does not accept writes after 559 // DroppedSnapshotException. We mock around it for this test. 560 } 561 // writing more data 562 int moreRow = 10; 563 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 564 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 565 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 566 Bytes.toBytes("val")); 567 region.put(put); 568 } 569 writtenRowCount += moreRow; 570 // call flush again 571 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 572 try { 573 region.flush(true); 574 } catch (IOException t) { 575 LOG.info( 576 "Expected exception when flushing region because server is stopped," + t.getMessage()); 577 } 578 579 region.close(true); 580 wal.shutdown(); 581 582 // Let us try to split and recover 583 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 584 WAL wal2 = createWAL(this.conf, rootDir, logName); 585 Mockito.doReturn(false).when(rsServices).isAborted(); 586 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); 587 scanner = region2.getScanner(new Scan()); 588 assertEquals(writtenRowCount, getScannedCount(scanner)); 589 } 590 591 private int getScannedCount(RegionScanner scanner) throws IOException { 592 int scannedCount = 0; 593 List<Cell> results = new ArrayList<>(); 594 while (true) { 595 boolean existMore = scanner.next(results); 596 if (!results.isEmpty()) { 597 scannedCount++; 598 } 599 if (!existMore) { 600 break; 601 } 602 results.clear(); 603 } 604 return scannedCount; 605 } 606 607 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { 608 // Read the recovered hfile 609 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); 610 FSDataInputStream in = fs.open(recoveredHFile); 611 byte[] fileContent = new byte[fileSize]; 612 in.readFully(0, fileContent, 0, fileSize); 613 in.close(); 614 615 // Write a corrupt hfile by append garbage 616 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); 617 FSDataOutputStream out; 618 out = fs.create(path); 619 out.write(fileContent); 620 out.write(Bytes.toBytes("-----")); 621 out.close(); 622 } 623}