001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Set; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.procedure2.Procedure; 042import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 046import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 047import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.testclassification.SmallTests; 051import org.apache.hadoop.io.IOUtils; 052import org.junit.After; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.mockito.Mockito; 058import org.mockito.invocation.InvocationOnMock; 059import org.mockito.stubbing.Answer; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 064 065@Category({ MasterTests.class, SmallTests.class }) 066public class TestWALProcedureStore { 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestWALProcedureStore.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 072 073 private static final int PROCEDURE_STORE_SLOTS = 1; 074 075 private WALProcedureStore procStore; 076 077 private final HBaseCommonTestingUtility htu = new HBaseCommonTestingUtility(); 078 private FileSystem fs; 079 private Path testDir; 080 private Path logDir; 081 082 private void setupConfig(final Configuration conf) { 083 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 084 } 085 086 @Before 087 public void setUp() throws IOException { 088 testDir = htu.getDataTestDir(); 089 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 090 fs = testDir.getFileSystem(htu.getConfiguration()); 091 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 092 assertTrue(testDir.depth() > 1); 093 094 TestSequentialProcedure.seqId.set(0); 095 setupConfig(htu.getConfiguration()); 096 logDir = new Path(testDir, "proc-logs"); 097 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 098 procStore.start(PROCEDURE_STORE_SLOTS); 099 procStore.recoverLease(); 100 procStore.load(new LoadCounter()); 101 } 102 103 @After 104 public void tearDown() throws IOException { 105 procStore.stop(false); 106 fs.delete(logDir, true); 107 } 108 109 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 110 ProcedureTestingUtility.storeRestart(procStore, loader); 111 } 112 113 @Test 114 public void testEmptyRoll() throws Exception { 115 for (int i = 0; i < 10; ++i) { 116 procStore.periodicRollForTesting(); 117 } 118 assertEquals(1, procStore.getActiveLogs().size()); 119 FileStatus[] status = fs.listStatus(logDir); 120 assertEquals(1, status.length); 121 } 122 123 @Test 124 public void testRestartWithoutData() throws Exception { 125 for (int i = 0; i < 10; ++i) { 126 final LoadCounter loader = new LoadCounter(); 127 storeRestart(loader); 128 } 129 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 130 assertEquals(1, procStore.getActiveLogs().size()); 131 FileStatus[] status = fs.listStatus(logDir); 132 assertEquals(1, status.length); 133 } 134 135 /** 136 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 137 */ 138 @Test 139 public void trackersLoadedForAllOldLogs() throws Exception { 140 for (int i = 0; i <= 20; ++i) { 141 procStore.insert(new TestProcedure(i), null); 142 if (i > 0 && (i % 5) == 0) { 143 LoadCounter loader = new LoadCounter(); 144 storeRestart(loader); 145 } 146 } 147 assertEquals(5, procStore.getActiveLogs().size()); 148 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 149 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 150 assertTrue(tracker != null && !tracker.isEmpty()); 151 } 152 } 153 154 @Test 155 public void testWalCleanerSequentialClean() throws Exception { 156 final Procedure<?>[] procs = new Procedure[5]; 157 ArrayList<ProcedureWALFile> logs = null; 158 159 // Insert procedures and roll wal after every insert. 160 for (int i = 0; i < procs.length; i++) { 161 procs[i] = new TestSequentialProcedure(); 162 procStore.insert(procs[i], null); 163 procStore.rollWriterForTesting(); 164 logs = procStore.getActiveLogs(); 165 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 166 } 167 168 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 169 // from logs list. 170 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 171 for (int i = 0; i < deleteOrder.length; i++) { 172 procStore.delete(procs[deleteOrder[i]].getProcId()); 173 procStore.removeInactiveLogsForTesting(); 174 assertFalse(logs.get(deleteOrder[i]).toString(), 175 procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); 176 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 177 } 178 } 179 180 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 181 // they are in the starting of the list. 182 @Test 183 public void testWalCleanerNoHoles() throws Exception { 184 final Procedure<?>[] procs = new Procedure[5]; 185 ArrayList<ProcedureWALFile> logs = null; 186 // Insert procedures and roll wal after every insert. 187 for (int i = 0; i < procs.length; i++) { 188 procs[i] = new TestSequentialProcedure(); 189 procStore.insert(procs[i], null); 190 procStore.rollWriterForTesting(); 191 logs = procStore.getActiveLogs(); 192 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 193 } 194 195 for (int i = 1; i < procs.length; i++) { 196 procStore.delete(procs[i].getProcId()); 197 } 198 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 199 procStore.delete(procs[0].getProcId()); 200 assertEquals(1, procStore.getActiveLogs().size()); 201 } 202 203 @Test 204 public void testWalCleanerUpdates() throws Exception { 205 TestSequentialProcedure p1 = new TestSequentialProcedure(); 206 TestSequentialProcedure p2 = new TestSequentialProcedure(); 207 procStore.insert(p1, null); 208 procStore.insert(p2, null); 209 procStore.rollWriterForTesting(); 210 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 211 procStore.update(p1); 212 procStore.rollWriterForTesting(); 213 procStore.update(p2); 214 procStore.rollWriterForTesting(); 215 procStore.removeInactiveLogsForTesting(); 216 assertFalse(procStore.getActiveLogs().contains(firstLog)); 217 } 218 219 @Test 220 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 221 TestSequentialProcedure p1 = new TestSequentialProcedure(); 222 TestSequentialProcedure p2 = new TestSequentialProcedure(); 223 procStore.insert(p1, null); 224 procStore.insert(p2, null); 225 procStore.rollWriterForTesting(); // generates first log with p1 + p2 226 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 227 procStore.update(p2); 228 procStore.rollWriterForTesting(); // generates second log with p2 229 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 230 procStore.update(p2); 231 procStore.rollWriterForTesting(); // generates third log with p2 232 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 233 assertEquals(4, procStore.getActiveLogs().size()); 234 procStore.update(p1); 235 procStore.rollWriterForTesting(); // generates fourth log with p1 236 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 237 assertEquals(3, procStore.getActiveLogs().size()); 238 assertFalse(procStore.getActiveLogs().contains(log1)); 239 assertFalse(procStore.getActiveLogs().contains(log2)); 240 } 241 242 @Test 243 public void testWalCleanerWithEmptyRolls() throws Exception { 244 final Procedure<?>[] procs = new Procedure[3]; 245 for (int i = 0; i < procs.length; ++i) { 246 procs[i] = new TestSequentialProcedure(); 247 procStore.insert(procs[i], null); 248 } 249 assertEquals(1, procStore.getActiveLogs().size()); 250 procStore.rollWriterForTesting(); 251 assertEquals(2, procStore.getActiveLogs().size()); 252 procStore.rollWriterForTesting(); 253 assertEquals(3, procStore.getActiveLogs().size()); 254 255 for (int i = 0; i < procs.length; ++i) { 256 procStore.update(procs[i]); 257 procStore.rollWriterForTesting(); 258 procStore.rollWriterForTesting(); 259 if (i < (procs.length - 1)) { 260 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 261 } 262 } 263 assertEquals(7, procStore.getActiveLogs().size()); 264 265 for (int i = 0; i < procs.length; ++i) { 266 procStore.delete(procs[i].getProcId()); 267 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 268 } 269 assertEquals(1, procStore.getActiveLogs().size()); 270 } 271 272 @Test 273 public void testEmptyLogLoad() throws Exception { 274 LoadCounter loader = new LoadCounter(); 275 storeRestart(loader); 276 assertEquals(0, loader.getMaxProcId()); 277 assertEquals(0, loader.getLoadedCount()); 278 assertEquals(0, loader.getCorruptedCount()); 279 } 280 281 @Test 282 public void testLoad() throws Exception { 283 Set<Long> procIds = new HashSet<>(); 284 285 // Insert something in the log 286 Procedure<?> proc1 = new TestSequentialProcedure(); 287 procIds.add(proc1.getProcId()); 288 procStore.insert(proc1, null); 289 290 Procedure<?> proc2 = new TestSequentialProcedure(); 291 Procedure<?>[] child2 = new Procedure[2]; 292 child2[0] = new TestSequentialProcedure(); 293 child2[1] = new TestSequentialProcedure(); 294 295 procIds.add(proc2.getProcId()); 296 procIds.add(child2[0].getProcId()); 297 procIds.add(child2[1].getProcId()); 298 procStore.insert(proc2, child2); 299 300 // Verify that everything is there 301 verifyProcIdsOnRestart(procIds); 302 303 // Update and delete something 304 procStore.update(proc1); 305 procStore.update(child2[1]); 306 procStore.delete(child2[1].getProcId()); 307 procIds.remove(child2[1].getProcId()); 308 309 // Verify that everything is there 310 verifyProcIdsOnRestart(procIds); 311 312 // Remove 4 byte from the trailers 313 procStore.stop(false); 314 FileStatus[] logs = fs.listStatus(logDir); 315 assertEquals(3, logs.length); 316 for (int i = 0; i < logs.length; ++i) { 317 corruptLog(logs[i], 4); 318 } 319 verifyProcIdsOnRestart(procIds); 320 } 321 322 @Test 323 public void testNoTrailerDoubleRestart() throws Exception { 324 // log-0001: proc 0, 1 and 2 are inserted 325 Procedure<?> proc0 = new TestSequentialProcedure(); 326 procStore.insert(proc0, null); 327 Procedure<?> proc1 = new TestSequentialProcedure(); 328 procStore.insert(proc1, null); 329 Procedure<?> proc2 = new TestSequentialProcedure(); 330 procStore.insert(proc2, null); 331 procStore.rollWriterForTesting(); 332 333 // log-0002: proc 1 deleted 334 procStore.delete(proc1.getProcId()); 335 procStore.rollWriterForTesting(); 336 337 // log-0003: proc 2 is update 338 procStore.update(proc2); 339 procStore.rollWriterForTesting(); 340 341 // log-0004: proc 2 deleted 342 procStore.delete(proc2.getProcId()); 343 344 // stop the store and remove the trailer 345 procStore.stop(false); 346 FileStatus[] logs = fs.listStatus(logDir); 347 assertEquals(4, logs.length); 348 for (int i = 0; i < logs.length; ++i) { 349 corruptLog(logs[i], 4); 350 } 351 352 // Test Load 1 353 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 354 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 355 LoadCounter loader = new LoadCounter(); 356 storeRestart(loader); 357 assertEquals(1, loader.getLoadedCount()); 358 assertEquals(0, loader.getCorruptedCount()); 359 360 // Test Load 2 361 assertEquals(5, fs.listStatus(logDir).length); 362 loader = new LoadCounter(); 363 storeRestart(loader); 364 assertEquals(1, loader.getLoadedCount()); 365 assertEquals(0, loader.getCorruptedCount()); 366 367 // remove proc-0 368 procStore.delete(proc0.getProcId()); 369 procStore.periodicRollForTesting(); 370 assertEquals(1, fs.listStatus(logDir).length); 371 storeRestart(loader); 372 } 373 374 @Test 375 public void testProcIdHoles() throws Exception { 376 // Insert 377 for (int i = 0; i < 100; i += 2) { 378 procStore.insert(new TestProcedure(i), null); 379 if (i > 0 && (i % 10) == 0) { 380 LoadCounter loader = new LoadCounter(); 381 storeRestart(loader); 382 assertEquals(0, loader.getCorruptedCount()); 383 assertEquals((i / 2) + 1, loader.getLoadedCount()); 384 } 385 } 386 assertEquals(10, procStore.getActiveLogs().size()); 387 388 // Delete 389 for (int i = 0; i < 100; i += 2) { 390 procStore.delete(i); 391 } 392 assertEquals(1, procStore.getActiveLogs().size()); 393 394 LoadCounter loader = new LoadCounter(); 395 storeRestart(loader); 396 assertEquals(0, loader.getLoadedCount()); 397 assertEquals(0, loader.getCorruptedCount()); 398 } 399 400 @Test 401 public void testCorruptedTrailer() throws Exception { 402 // Insert something 403 for (int i = 0; i < 100; ++i) { 404 procStore.insert(new TestSequentialProcedure(), null); 405 } 406 407 // Stop the store 408 procStore.stop(false); 409 410 // Remove 4 byte from the trailer 411 FileStatus[] logs = fs.listStatus(logDir); 412 assertEquals(1, logs.length); 413 corruptLog(logs[0], 4); 414 415 LoadCounter loader = new LoadCounter(); 416 storeRestart(loader); 417 assertEquals(100, loader.getLoadedCount()); 418 assertEquals(0, loader.getCorruptedCount()); 419 } 420 421 private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 422 final int[] updatedProcs, final int[] nonUpdatedProcs) { 423 for (int index : updatedProcs) { 424 long procId = procs[index].getProcId(); 425 assertTrue("Procedure id : " + procId, tracker.isModified(procId)); 426 } 427 for (int index : nonUpdatedProcs) { 428 long procId = procs[index].getProcId(); 429 assertFalse("Procedure id : " + procId, tracker.isModified(procId)); 430 } 431 } 432 433 private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 434 final int[] deletedProcs, final int[] nonDeletedProcs) { 435 for (int index : deletedProcs) { 436 long procId = procs[index].getProcId(); 437 assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.YES, 438 tracker.isDeleted(procId)); 439 } 440 for (int index : nonDeletedProcs) { 441 long procId = procs[index].getProcId(); 442 assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.NO, 443 tracker.isDeleted(procId)); 444 } 445 } 446 447 @Test 448 public void testCorruptedTrailersRebuild() throws Exception { 449 final Procedure<?>[] procs = new Procedure[6]; 450 for (int i = 0; i < procs.length; ++i) { 451 procs[i] = new TestSequentialProcedure(); 452 } 453 // Log State (I=insert, U=updated, D=delete) 454 // | log 1 | log 2 | log 3 | 455 // 0 | I, D | | | 456 // 1 | I | | | 457 // 2 | I | D | | 458 // 3 | I | U | | 459 // 4 | | I | D | 460 // 5 | | | I | 461 procStore.insert(procs[0], null); 462 procStore.insert(procs[1], null); 463 procStore.insert(procs[2], null); 464 procStore.insert(procs[3], null); 465 procStore.delete(procs[0].getProcId()); 466 procStore.rollWriterForTesting(); 467 procStore.delete(procs[2].getProcId()); 468 procStore.update(procs[3]); 469 procStore.insert(procs[4], null); 470 procStore.rollWriterForTesting(); 471 procStore.delete(procs[4].getProcId()); 472 procStore.insert(procs[5], null); 473 474 // Stop the store 475 procStore.stop(false); 476 477 // Remove 4 byte from the trailers 478 final FileStatus[] logs = fs.listStatus(logDir); 479 assertEquals(3, logs.length); 480 for (int i = 0; i < logs.length; ++i) { 481 corruptLog(logs[i], 4); 482 } 483 484 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 485 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 486 final LoadCounter loader = new LoadCounter(); 487 storeRestart(loader); 488 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 489 assertEquals(0, loader.getCorruptedCount()); 490 491 // Check the Trackers 492 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 493 LOG.info("WALs " + walFiles); 494 assertEquals(4, walFiles.size()); 495 LOG.info("Checking wal " + walFiles.get(0)); 496 assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 }, 497 new int[] { 4, 5 }); 498 LOG.info("Checking wal " + walFiles.get(1)); 499 assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 }, 500 new int[] { 0, 1, 5 }); 501 LOG.info("Checking wal " + walFiles.get(2)); 502 assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 }, 503 new int[] { 0, 1, 2, 3 }); 504 LOG.info("Checking global tracker "); 505 assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 }); 506 } 507 508 @Test 509 public void testCorruptedEntries() throws Exception { 510 // Insert something 511 for (int i = 0; i < 100; ++i) { 512 procStore.insert(new TestSequentialProcedure(), null); 513 } 514 515 // Stop the store 516 procStore.stop(false); 517 518 // Remove some byte from the log 519 // (enough to cut the trailer and corrupt some entries) 520 FileStatus[] logs = fs.listStatus(logDir); 521 assertEquals(1, logs.length); 522 corruptLog(logs[0], 1823); 523 524 LoadCounter loader = new LoadCounter(); 525 storeRestart(loader); 526 assertTrue(procStore.getCorruptedLogs() != null); 527 assertEquals(1, procStore.getCorruptedLogs().size()); 528 assertEquals(87, loader.getLoadedCount()); 529 assertEquals(0, loader.getCorruptedCount()); 530 } 531 532 @Test 533 public void testCorruptedProcedures() throws Exception { 534 // Insert root-procedures 535 TestProcedure[] rootProcs = new TestProcedure[10]; 536 for (int i = 1; i <= rootProcs.length; i++) { 537 rootProcs[i - 1] = new TestProcedure(i, 0); 538 procStore.insert(rootProcs[i - 1], null); 539 rootProcs[i - 1].addStackId(0); 540 procStore.update(rootProcs[i - 1]); 541 } 542 // insert root-child txn 543 procStore.rollWriterForTesting(); 544 for (int i = 1; i <= rootProcs.length; i++) { 545 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 546 rootProcs[i - 1].addStackId(1); 547 procStore.insert(rootProcs[i - 1], new Procedure[] { b }); 548 } 549 // insert child updates 550 procStore.rollWriterForTesting(); 551 for (int i = 1; i <= rootProcs.length; i++) { 552 procStore.update(new TestProcedure(rootProcs.length + i, i)); 553 } 554 555 // Stop the store 556 procStore.stop(false); 557 558 // the first log was removed, 559 // we have insert-txn and updates in the others so everything is fine 560 FileStatus[] logs = fs.listStatus(logDir); 561 assertEquals(Arrays.toString(logs), 2, logs.length); 562 Arrays.sort(logs, new Comparator<FileStatus>() { 563 @Override 564 public int compare(FileStatus o1, FileStatus o2) { 565 return o1.getPath().getName().compareTo(o2.getPath().getName()); 566 } 567 }); 568 569 LoadCounter loader = new LoadCounter(); 570 storeRestart(loader); 571 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 572 assertEquals(0, loader.getCorruptedCount()); 573 574 // Remove the second log, we have lost all the root/parent references 575 fs.delete(logs[0].getPath(), false); 576 loader.reset(); 577 storeRestart(loader); 578 assertEquals(0, loader.getLoadedCount()); 579 assertEquals(rootProcs.length, loader.getCorruptedCount()); 580 for (Procedure<?> proc : loader.getCorrupted()) { 581 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); 582 assertTrue(proc.toString(), 583 proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); 584 } 585 } 586 587 @Test 588 public void testRollAndRemove() throws IOException { 589 // Insert something in the log 590 Procedure<?> proc1 = new TestSequentialProcedure(); 591 procStore.insert(proc1, null); 592 593 Procedure<?> proc2 = new TestSequentialProcedure(); 594 procStore.insert(proc2, null); 595 596 // roll the log, now we have 2 597 procStore.rollWriterForTesting(); 598 assertEquals(2, procStore.getActiveLogs().size()); 599 600 // everything will be up to date in the second log 601 // so we can remove the first one 602 procStore.update(proc1); 603 procStore.update(proc2); 604 assertEquals(1, procStore.getActiveLogs().size()); 605 606 // roll the log, now we have 2 607 procStore.rollWriterForTesting(); 608 assertEquals(2, procStore.getActiveLogs().size()); 609 610 // remove everything active 611 // so we can remove all the logs 612 procStore.delete(proc1.getProcId()); 613 procStore.delete(proc2.getProcId()); 614 assertEquals(1, procStore.getActiveLogs().size()); 615 } 616 617 @Test 618 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 619 final TestProcedure[] procs = new TestProcedure[3]; 620 for (int i = 0; i < procs.length; ++i) { 621 procs[i] = new TestProcedure(i + 1, 0); 622 procStore.insert(procs[i], null); 623 } 624 procStore.rollWriterForTesting(); 625 for (int i = 0; i < procs.length; ++i) { 626 procStore.update(procs[i]); 627 procStore.rollWriterForTesting(); 628 } 629 procStore.stop(false); 630 631 FileStatus[] status = fs.listStatus(logDir); 632 assertEquals(procs.length + 1, status.length); 633 634 // simulate another active master removing the wals 635 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() { 636 private int count = 0; 637 638 @Override 639 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 640 if (++count <= 2) { 641 fs.delete(path, false); 642 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 643 throw new FileNotFoundException("test file not found " + path); 644 } 645 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 646 } 647 }); 648 649 final LoadCounter loader = new LoadCounter(); 650 procStore.start(PROCEDURE_STORE_SLOTS); 651 procStore.recoverLease(); 652 procStore.load(loader); 653 assertEquals(procs.length, loader.getMaxProcId()); 654 assertEquals(1, loader.getRunnableCount()); 655 assertEquals(0, loader.getCompletedCount()); 656 assertEquals(0, loader.getCorruptedCount()); 657 } 658 659 @Test 660 public void testLogFileAlreadyExists() throws IOException { 661 final boolean[] tested = { false }; 662 WALProcedureStore mStore = Mockito.spy(procStore); 663 664 Answer<Boolean> ans = new Answer<Boolean>() { 665 @Override 666 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 667 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 668 switch ((int) logId) { 669 case 2: 670 // Create a file so that real rollWriter() runs into file exists condition 671 Path logFilePath = mStore.getLogFilePath(logId); 672 mStore.getFileSystem().create(logFilePath); 673 break; 674 case 3: 675 // Success only when we retry with logId 3 676 tested[0] = true; 677 default: 678 break; 679 } 680 return (Boolean) invocationOnMock.callRealMethod(); 681 } 682 }; 683 684 // First time Store has one log file, next id will be 2 685 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 686 // next time its 3 687 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 688 689 mStore.recoverLease(); 690 assertTrue(tested[0]); 691 } 692 693 @Test 694 public void testLoadChildren() throws Exception { 695 TestProcedure a = new TestProcedure(1, 0); 696 TestProcedure b = new TestProcedure(2, 1); 697 TestProcedure c = new TestProcedure(3, 1); 698 699 // INIT 700 procStore.insert(a, null); 701 702 // Run A first step 703 a.addStackId(0); 704 procStore.update(a); 705 706 // Run A second step 707 a.addStackId(1); 708 procStore.insert(a, new Procedure[] { b, c }); 709 710 // Run B first step 711 b.addStackId(2); 712 procStore.update(b); 713 714 // Run C first and last step 715 c.addStackId(3); 716 procStore.update(c); 717 718 // Run B second setp 719 b.addStackId(4); 720 procStore.update(b); 721 722 // back to A 723 a.addStackId(5); 724 a.setSuccessState(); 725 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 726 restartAndAssert(3, 0, 1, 0); 727 } 728 729 @Test 730 public void testBatchDelete() throws Exception { 731 for (int i = 1; i < 10; ++i) { 732 procStore.insert(new TestProcedure(i), null); 733 } 734 735 // delete nothing 736 long[] toDelete = new long[] { 1, 2, 3, 4 }; 737 procStore.delete(toDelete, 2, 0); 738 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 739 for (int i = 1; i < 10; ++i) { 740 assertEquals(true, loader.isRunnable(i)); 741 } 742 743 // delete the full "toDelete" array (2, 4, 6, 8) 744 toDelete = new long[] { 2, 4, 6, 8 }; 745 procStore.delete(toDelete, 0, toDelete.length); 746 loader = restartAndAssert(9, 5, 0, 0); 747 for (int i = 1; i < 10; ++i) { 748 assertEquals(i % 2 != 0, loader.isRunnable(i)); 749 } 750 751 // delete a slice of "toDelete" (1, 3) 752 toDelete = new long[] { 5, 7, 1, 3, 9 }; 753 procStore.delete(toDelete, 2, 2); 754 loader = restartAndAssert(9, 3, 0, 0); 755 for (int i = 1; i < 10; ++i) { 756 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 757 } 758 759 // delete a single item (5) 760 toDelete = new long[] { 5 }; 761 procStore.delete(toDelete, 0, 1); 762 loader = restartAndAssert(9, 2, 0, 0); 763 for (int i = 1; i < 10; ++i) { 764 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 765 } 766 767 // delete remaining using a slice of "toDelete" (7, 9) 768 toDelete = new long[] { 0, 7, 9 }; 769 procStore.delete(toDelete, 1, 2); 770 loader = restartAndAssert(0, 0, 0, 0); 771 for (int i = 1; i < 10; ++i) { 772 assertEquals(false, loader.isRunnable(i)); 773 } 774 } 775 776 @Test 777 public void testBatchInsert() throws Exception { 778 final int count = 10; 779 final TestProcedure[] procs = new TestProcedure[count]; 780 for (int i = 0; i < procs.length; ++i) { 781 procs[i] = new TestProcedure(i + 1); 782 } 783 procStore.insert(procs); 784 restartAndAssert(count, count, 0, 0); 785 786 for (int i = 0; i < procs.length; ++i) { 787 final long procId = procs[i].getProcId(); 788 procStore.delete(procId); 789 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 790 } 791 procStore.removeInactiveLogsForTesting(); 792 assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); 793 } 794 795 @Test 796 public void testWALDirAndWALArchiveDir() throws IOException { 797 Configuration conf = htu.getConfiguration(); 798 procStore = createWALProcedureStore(conf); 799 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 800 } 801 802 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 803 return new WALProcedureStore(conf, new LeaseRecovery() { 804 @Override 805 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 806 // no-op 807 } 808 }); 809 } 810 811 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, 812 int corruptedCount) throws Exception { 813 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount, 814 completedCount, corruptedCount); 815 } 816 817 private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { 818 assertTrue(logFile.getLen() > dropBytes); 819 LOG.debug( 820 "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes); 821 Path tmpPath = new Path(testDir, "corrupted.log"); 822 InputStream in = fs.open(logFile.getPath()); 823 OutputStream out = fs.create(tmpPath); 824 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 825 if (!fs.rename(tmpPath, logFile.getPath())) { 826 throw new IOException("Unable to rename"); 827 } 828 } 829 830 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 831 LOG.debug("expected: " + procIds); 832 LoadCounter loader = new LoadCounter(); 833 storeRestart(loader); 834 assertEquals(procIds.size(), loader.getLoadedCount()); 835 assertEquals(0, loader.getCorruptedCount()); 836 } 837 838 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 839 840 private static final AtomicLong seqId = new AtomicLong(0); 841 842 public TestSequentialProcedure() { 843 setProcId(seqId.incrementAndGet()); 844 } 845 846 @Override 847 protected Procedure<Void>[] execute(Void env) { 848 return null; 849 } 850 851 @Override 852 protected void rollback(Void env) { 853 } 854 855 @Override 856 protected boolean abort(Void env) { 857 return false; 858 } 859 860 @Override 861 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 862 long procId = getProcId(); 863 if (procId % 2 == 0) { 864 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 865 serializer.serialize(builder.build()); 866 } 867 } 868 869 @Override 870 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 871 long procId = getProcId(); 872 if (procId % 2 == 0) { 873 Int64Value value = serializer.deserialize(Int64Value.class); 874 assertEquals(procId, value.getValue()); 875 } 876 } 877 } 878}