001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.procedure2.Procedure;
042import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
046import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
047import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.apache.hadoop.io.IOUtils;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.mockito.Mockito;
058import org.mockito.invocation.InvocationOnMock;
059import org.mockito.stubbing.Answer;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
064
065@Category({ MasterTests.class, SmallTests.class })
066public class TestWALProcedureStore {
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestWALProcedureStore.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
072
073  private static final int PROCEDURE_STORE_SLOTS = 1;
074
075  private WALProcedureStore procStore;
076
077  private final HBaseCommonTestingUtility htu = new HBaseCommonTestingUtility();
078  private FileSystem fs;
079  private Path testDir;
080  private Path logDir;
081
082  private void setupConfig(final Configuration conf) {
083    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
084  }
085
086  @Before
087  public void setUp() throws IOException {
088    testDir = htu.getDataTestDir();
089    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
090    fs = testDir.getFileSystem(htu.getConfiguration());
091    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
092    assertTrue(testDir.depth() > 1);
093
094    TestSequentialProcedure.seqId.set(0);
095    setupConfig(htu.getConfiguration());
096    logDir = new Path(testDir, "proc-logs");
097    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
098    procStore.start(PROCEDURE_STORE_SLOTS);
099    procStore.recoverLease();
100    procStore.load(new LoadCounter());
101  }
102
103  @After
104  public void tearDown() throws IOException {
105    procStore.stop(false);
106    fs.delete(logDir, true);
107  }
108
109  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
110    ProcedureTestingUtility.storeRestart(procStore, loader);
111  }
112
113  @Test
114  public void testEmptyRoll() throws Exception {
115    for (int i = 0; i < 10; ++i) {
116      procStore.periodicRollForTesting();
117    }
118    assertEquals(1, procStore.getActiveLogs().size());
119    FileStatus[] status = fs.listStatus(logDir);
120    assertEquals(1, status.length);
121  }
122
123  @Test
124  public void testRestartWithoutData() throws Exception {
125    for (int i = 0; i < 10; ++i) {
126      final LoadCounter loader = new LoadCounter();
127      storeRestart(loader);
128    }
129    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
130    assertEquals(1, procStore.getActiveLogs().size());
131    FileStatus[] status = fs.listStatus(logDir);
132    assertEquals(1, status.length);
133  }
134
135  /**
136   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
137   */
138  @Test
139  public void trackersLoadedForAllOldLogs() throws Exception {
140    for (int i = 0; i <= 20; ++i) {
141      procStore.insert(new TestProcedure(i), null);
142      if (i > 0 && (i % 5) == 0) {
143        LoadCounter loader = new LoadCounter();
144        storeRestart(loader);
145      }
146    }
147    assertEquals(5, procStore.getActiveLogs().size());
148    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
149      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
150      assertTrue(tracker != null && !tracker.isEmpty());
151    }
152  }
153
154  @Test
155  public void testWalCleanerSequentialClean() throws Exception {
156    final Procedure<?>[] procs = new Procedure[5];
157    ArrayList<ProcedureWALFile> logs = null;
158
159    // Insert procedures and roll wal after every insert.
160    for (int i = 0; i < procs.length; i++) {
161      procs[i] = new TestSequentialProcedure();
162      procStore.insert(procs[i], null);
163      procStore.rollWriterForTesting();
164      logs = procStore.getActiveLogs();
165      assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal.
166    }
167
168    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
169    // from logs list.
170    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
171    for (int i = 0; i < deleteOrder.length; i++) {
172      procStore.delete(procs[deleteOrder[i]].getProcId());
173      procStore.removeInactiveLogsForTesting();
174      assertFalse(logs.get(deleteOrder[i]).toString(),
175        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
176      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
177    }
178  }
179
180  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
181  // they are in the starting of the list.
182  @Test
183  public void testWalCleanerNoHoles() throws Exception {
184    final Procedure<?>[] procs = new Procedure[5];
185    ArrayList<ProcedureWALFile> logs = null;
186    // Insert procedures and roll wal after every insert.
187    for (int i = 0; i < procs.length; i++) {
188      procs[i] = new TestSequentialProcedure();
189      procStore.insert(procs[i], null);
190      procStore.rollWriterForTesting();
191      logs = procStore.getActiveLogs();
192      assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal.
193    }
194
195    for (int i = 1; i < procs.length; i++) {
196      procStore.delete(procs[i].getProcId());
197    }
198    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
199    procStore.delete(procs[0].getProcId());
200    assertEquals(1, procStore.getActiveLogs().size());
201  }
202
203  @Test
204  public void testWalCleanerUpdates() throws Exception {
205    TestSequentialProcedure p1 = new TestSequentialProcedure();
206    TestSequentialProcedure p2 = new TestSequentialProcedure();
207    procStore.insert(p1, null);
208    procStore.insert(p2, null);
209    procStore.rollWriterForTesting();
210    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
211    procStore.update(p1);
212    procStore.rollWriterForTesting();
213    procStore.update(p2);
214    procStore.rollWriterForTesting();
215    procStore.removeInactiveLogsForTesting();
216    assertFalse(procStore.getActiveLogs().contains(firstLog));
217  }
218
219  @Test
220  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
221    TestSequentialProcedure p1 = new TestSequentialProcedure();
222    TestSequentialProcedure p2 = new TestSequentialProcedure();
223    procStore.insert(p1, null);
224    procStore.insert(p2, null);
225    procStore.rollWriterForTesting(); // generates first log with p1 + p2
226    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
227    procStore.update(p2);
228    procStore.rollWriterForTesting(); // generates second log with p2
229    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
230    procStore.update(p2);
231    procStore.rollWriterForTesting(); // generates third log with p2
232    procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log.
233    assertEquals(4, procStore.getActiveLogs().size());
234    procStore.update(p1);
235    procStore.rollWriterForTesting(); // generates fourth log with p1
236    procStore.removeInactiveLogsForTesting(); // Should remove first two logs.
237    assertEquals(3, procStore.getActiveLogs().size());
238    assertFalse(procStore.getActiveLogs().contains(log1));
239    assertFalse(procStore.getActiveLogs().contains(log2));
240  }
241
242  @Test
243  public void testWalCleanerWithEmptyRolls() throws Exception {
244    final Procedure<?>[] procs = new Procedure[3];
245    for (int i = 0; i < procs.length; ++i) {
246      procs[i] = new TestSequentialProcedure();
247      procStore.insert(procs[i], null);
248    }
249    assertEquals(1, procStore.getActiveLogs().size());
250    procStore.rollWriterForTesting();
251    assertEquals(2, procStore.getActiveLogs().size());
252    procStore.rollWriterForTesting();
253    assertEquals(3, procStore.getActiveLogs().size());
254
255    for (int i = 0; i < procs.length; ++i) {
256      procStore.update(procs[i]);
257      procStore.rollWriterForTesting();
258      procStore.rollWriterForTesting();
259      if (i < (procs.length - 1)) {
260        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
261      }
262    }
263    assertEquals(7, procStore.getActiveLogs().size());
264
265    for (int i = 0; i < procs.length; ++i) {
266      procStore.delete(procs[i].getProcId());
267      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
268    }
269    assertEquals(1, procStore.getActiveLogs().size());
270  }
271
272  @Test
273  public void testEmptyLogLoad() throws Exception {
274    LoadCounter loader = new LoadCounter();
275    storeRestart(loader);
276    assertEquals(0, loader.getMaxProcId());
277    assertEquals(0, loader.getLoadedCount());
278    assertEquals(0, loader.getCorruptedCount());
279  }
280
281  @Test
282  public void testLoad() throws Exception {
283    Set<Long> procIds = new HashSet<>();
284
285    // Insert something in the log
286    Procedure<?> proc1 = new TestSequentialProcedure();
287    procIds.add(proc1.getProcId());
288    procStore.insert(proc1, null);
289
290    Procedure<?> proc2 = new TestSequentialProcedure();
291    Procedure<?>[] child2 = new Procedure[2];
292    child2[0] = new TestSequentialProcedure();
293    child2[1] = new TestSequentialProcedure();
294
295    procIds.add(proc2.getProcId());
296    procIds.add(child2[0].getProcId());
297    procIds.add(child2[1].getProcId());
298    procStore.insert(proc2, child2);
299
300    // Verify that everything is there
301    verifyProcIdsOnRestart(procIds);
302
303    // Update and delete something
304    procStore.update(proc1);
305    procStore.update(child2[1]);
306    procStore.delete(child2[1].getProcId());
307    procIds.remove(child2[1].getProcId());
308
309    // Verify that everything is there
310    verifyProcIdsOnRestart(procIds);
311
312    // Remove 4 byte from the trailers
313    procStore.stop(false);
314    FileStatus[] logs = fs.listStatus(logDir);
315    assertEquals(3, logs.length);
316    for (int i = 0; i < logs.length; ++i) {
317      corruptLog(logs[i], 4);
318    }
319    verifyProcIdsOnRestart(procIds);
320  }
321
322  @Test
323  public void testNoTrailerDoubleRestart() throws Exception {
324    // log-0001: proc 0, 1 and 2 are inserted
325    Procedure<?> proc0 = new TestSequentialProcedure();
326    procStore.insert(proc0, null);
327    Procedure<?> proc1 = new TestSequentialProcedure();
328    procStore.insert(proc1, null);
329    Procedure<?> proc2 = new TestSequentialProcedure();
330    procStore.insert(proc2, null);
331    procStore.rollWriterForTesting();
332
333    // log-0002: proc 1 deleted
334    procStore.delete(proc1.getProcId());
335    procStore.rollWriterForTesting();
336
337    // log-0003: proc 2 is update
338    procStore.update(proc2);
339    procStore.rollWriterForTesting();
340
341    // log-0004: proc 2 deleted
342    procStore.delete(proc2.getProcId());
343
344    // stop the store and remove the trailer
345    procStore.stop(false);
346    FileStatus[] logs = fs.listStatus(logDir);
347    assertEquals(4, logs.length);
348    for (int i = 0; i < logs.length; ++i) {
349      corruptLog(logs[i], 4);
350    }
351
352    // Test Load 1
353    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
354    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
355    LoadCounter loader = new LoadCounter();
356    storeRestart(loader);
357    assertEquals(1, loader.getLoadedCount());
358    assertEquals(0, loader.getCorruptedCount());
359
360    // Test Load 2
361    assertEquals(5, fs.listStatus(logDir).length);
362    loader = new LoadCounter();
363    storeRestart(loader);
364    assertEquals(1, loader.getLoadedCount());
365    assertEquals(0, loader.getCorruptedCount());
366
367    // remove proc-0
368    procStore.delete(proc0.getProcId());
369    procStore.periodicRollForTesting();
370    assertEquals(1, fs.listStatus(logDir).length);
371    storeRestart(loader);
372  }
373
374  @Test
375  public void testProcIdHoles() throws Exception {
376    // Insert
377    for (int i = 0; i < 100; i += 2) {
378      procStore.insert(new TestProcedure(i), null);
379      if (i > 0 && (i % 10) == 0) {
380        LoadCounter loader = new LoadCounter();
381        storeRestart(loader);
382        assertEquals(0, loader.getCorruptedCount());
383        assertEquals((i / 2) + 1, loader.getLoadedCount());
384      }
385    }
386    assertEquals(10, procStore.getActiveLogs().size());
387
388    // Delete
389    for (int i = 0; i < 100; i += 2) {
390      procStore.delete(i);
391    }
392    assertEquals(1, procStore.getActiveLogs().size());
393
394    LoadCounter loader = new LoadCounter();
395    storeRestart(loader);
396    assertEquals(0, loader.getLoadedCount());
397    assertEquals(0, loader.getCorruptedCount());
398  }
399
400  @Test
401  public void testCorruptedTrailer() throws Exception {
402    // Insert something
403    for (int i = 0; i < 100; ++i) {
404      procStore.insert(new TestSequentialProcedure(), null);
405    }
406
407    // Stop the store
408    procStore.stop(false);
409
410    // Remove 4 byte from the trailer
411    FileStatus[] logs = fs.listStatus(logDir);
412    assertEquals(1, logs.length);
413    corruptLog(logs[0], 4);
414
415    LoadCounter loader = new LoadCounter();
416    storeRestart(loader);
417    assertEquals(100, loader.getLoadedCount());
418    assertEquals(0, loader.getCorruptedCount());
419  }
420
421  private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
422    final int[] updatedProcs, final int[] nonUpdatedProcs) {
423    for (int index : updatedProcs) {
424      long procId = procs[index].getProcId();
425      assertTrue("Procedure id : " + procId, tracker.isModified(procId));
426    }
427    for (int index : nonUpdatedProcs) {
428      long procId = procs[index].getProcId();
429      assertFalse("Procedure id : " + procId, tracker.isModified(procId));
430    }
431  }
432
433  private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
434    final int[] deletedProcs, final int[] nonDeletedProcs) {
435    for (int index : deletedProcs) {
436      long procId = procs[index].getProcId();
437      assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.YES,
438        tracker.isDeleted(procId));
439    }
440    for (int index : nonDeletedProcs) {
441      long procId = procs[index].getProcId();
442      assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.NO,
443        tracker.isDeleted(procId));
444    }
445  }
446
447  @Test
448  public void testCorruptedTrailersRebuild() throws Exception {
449    final Procedure<?>[] procs = new Procedure[6];
450    for (int i = 0; i < procs.length; ++i) {
451      procs[i] = new TestSequentialProcedure();
452    }
453    // Log State (I=insert, U=updated, D=delete)
454    // | log 1 | log 2 | log 3 |
455    // 0 | I, D | | |
456    // 1 | I | | |
457    // 2 | I | D | |
458    // 3 | I | U | |
459    // 4 | | I | D |
460    // 5 | | | I |
461    procStore.insert(procs[0], null);
462    procStore.insert(procs[1], null);
463    procStore.insert(procs[2], null);
464    procStore.insert(procs[3], null);
465    procStore.delete(procs[0].getProcId());
466    procStore.rollWriterForTesting();
467    procStore.delete(procs[2].getProcId());
468    procStore.update(procs[3]);
469    procStore.insert(procs[4], null);
470    procStore.rollWriterForTesting();
471    procStore.delete(procs[4].getProcId());
472    procStore.insert(procs[5], null);
473
474    // Stop the store
475    procStore.stop(false);
476
477    // Remove 4 byte from the trailers
478    final FileStatus[] logs = fs.listStatus(logDir);
479    assertEquals(3, logs.length);
480    for (int i = 0; i < logs.length; ++i) {
481      corruptLog(logs[i], 4);
482    }
483
484    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
485    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
486    final LoadCounter loader = new LoadCounter();
487    storeRestart(loader);
488    assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
489    assertEquals(0, loader.getCorruptedCount());
490
491    // Check the Trackers
492    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
493    LOG.info("WALs " + walFiles);
494    assertEquals(4, walFiles.size());
495    LOG.info("Checking wal " + walFiles.get(0));
496    assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 },
497      new int[] { 4, 5 });
498    LOG.info("Checking wal " + walFiles.get(1));
499    assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 },
500      new int[] { 0, 1, 5 });
501    LOG.info("Checking wal " + walFiles.get(2));
502    assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 },
503      new int[] { 0, 1, 2, 3 });
504    LOG.info("Checking global tracker ");
505    assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 });
506  }
507
508  @Test
509  public void testCorruptedEntries() throws Exception {
510    // Insert something
511    for (int i = 0; i < 100; ++i) {
512      procStore.insert(new TestSequentialProcedure(), null);
513    }
514
515    // Stop the store
516    procStore.stop(false);
517
518    // Remove some byte from the log
519    // (enough to cut the trailer and corrupt some entries)
520    FileStatus[] logs = fs.listStatus(logDir);
521    assertEquals(1, logs.length);
522    corruptLog(logs[0], 1823);
523
524    LoadCounter loader = new LoadCounter();
525    storeRestart(loader);
526    assertTrue(procStore.getCorruptedLogs() != null);
527    assertEquals(1, procStore.getCorruptedLogs().size());
528    assertEquals(87, loader.getLoadedCount());
529    assertEquals(0, loader.getCorruptedCount());
530  }
531
532  @Test
533  public void testCorruptedProcedures() throws Exception {
534    // Insert root-procedures
535    TestProcedure[] rootProcs = new TestProcedure[10];
536    for (int i = 1; i <= rootProcs.length; i++) {
537      rootProcs[i - 1] = new TestProcedure(i, 0);
538      procStore.insert(rootProcs[i - 1], null);
539      rootProcs[i - 1].addStackId(0);
540      procStore.update(rootProcs[i - 1]);
541    }
542    // insert root-child txn
543    procStore.rollWriterForTesting();
544    for (int i = 1; i <= rootProcs.length; i++) {
545      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
546      rootProcs[i - 1].addStackId(1);
547      procStore.insert(rootProcs[i - 1], new Procedure[] { b });
548    }
549    // insert child updates
550    procStore.rollWriterForTesting();
551    for (int i = 1; i <= rootProcs.length; i++) {
552      procStore.update(new TestProcedure(rootProcs.length + i, i));
553    }
554
555    // Stop the store
556    procStore.stop(false);
557
558    // the first log was removed,
559    // we have insert-txn and updates in the others so everything is fine
560    FileStatus[] logs = fs.listStatus(logDir);
561    assertEquals(Arrays.toString(logs), 2, logs.length);
562    Arrays.sort(logs, new Comparator<FileStatus>() {
563      @Override
564      public int compare(FileStatus o1, FileStatus o2) {
565        return o1.getPath().getName().compareTo(o2.getPath().getName());
566      }
567    });
568
569    LoadCounter loader = new LoadCounter();
570    storeRestart(loader);
571    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
572    assertEquals(0, loader.getCorruptedCount());
573
574    // Remove the second log, we have lost all the root/parent references
575    fs.delete(logs[0].getPath(), false);
576    loader.reset();
577    storeRestart(loader);
578    assertEquals(0, loader.getLoadedCount());
579    assertEquals(rootProcs.length, loader.getCorruptedCount());
580    for (Procedure<?> proc : loader.getCorrupted()) {
581      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
582      assertTrue(proc.toString(),
583        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
584    }
585  }
586
587  @Test
588  public void testRollAndRemove() throws IOException {
589    // Insert something in the log
590    Procedure<?> proc1 = new TestSequentialProcedure();
591    procStore.insert(proc1, null);
592
593    Procedure<?> proc2 = new TestSequentialProcedure();
594    procStore.insert(proc2, null);
595
596    // roll the log, now we have 2
597    procStore.rollWriterForTesting();
598    assertEquals(2, procStore.getActiveLogs().size());
599
600    // everything will be up to date in the second log
601    // so we can remove the first one
602    procStore.update(proc1);
603    procStore.update(proc2);
604    assertEquals(1, procStore.getActiveLogs().size());
605
606    // roll the log, now we have 2
607    procStore.rollWriterForTesting();
608    assertEquals(2, procStore.getActiveLogs().size());
609
610    // remove everything active
611    // so we can remove all the logs
612    procStore.delete(proc1.getProcId());
613    procStore.delete(proc2.getProcId());
614    assertEquals(1, procStore.getActiveLogs().size());
615  }
616
617  @Test
618  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
619    final TestProcedure[] procs = new TestProcedure[3];
620    for (int i = 0; i < procs.length; ++i) {
621      procs[i] = new TestProcedure(i + 1, 0);
622      procStore.insert(procs[i], null);
623    }
624    procStore.rollWriterForTesting();
625    for (int i = 0; i < procs.length; ++i) {
626      procStore.update(procs[i]);
627      procStore.rollWriterForTesting();
628    }
629    procStore.stop(false);
630
631    FileStatus[] status = fs.listStatus(logDir);
632    assertEquals(procs.length + 1, status.length);
633
634    // simulate another active master removing the wals
635    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() {
636      private int count = 0;
637
638      @Override
639      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
640        if (++count <= 2) {
641          fs.delete(path, false);
642          LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
643          throw new FileNotFoundException("test file not found " + path);
644        }
645        LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
646      }
647    });
648
649    final LoadCounter loader = new LoadCounter();
650    procStore.start(PROCEDURE_STORE_SLOTS);
651    procStore.recoverLease();
652    procStore.load(loader);
653    assertEquals(procs.length, loader.getMaxProcId());
654    assertEquals(1, loader.getRunnableCount());
655    assertEquals(0, loader.getCompletedCount());
656    assertEquals(0, loader.getCorruptedCount());
657  }
658
659  @Test
660  public void testLogFileAlreadyExists() throws IOException {
661    final boolean[] tested = { false };
662    WALProcedureStore mStore = Mockito.spy(procStore);
663
664    Answer<Boolean> ans = new Answer<Boolean>() {
665      @Override
666      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
667        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
668        switch ((int) logId) {
669          case 2:
670            // Create a file so that real rollWriter() runs into file exists condition
671            Path logFilePath = mStore.getLogFilePath(logId);
672            mStore.getFileSystem().create(logFilePath);
673            break;
674          case 3:
675            // Success only when we retry with logId 3
676            tested[0] = true;
677          default:
678            break;
679        }
680        return (Boolean) invocationOnMock.callRealMethod();
681      }
682    };
683
684    // First time Store has one log file, next id will be 2
685    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
686    // next time its 3
687    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
688
689    mStore.recoverLease();
690    assertTrue(tested[0]);
691  }
692
693  @Test
694  public void testLoadChildren() throws Exception {
695    TestProcedure a = new TestProcedure(1, 0);
696    TestProcedure b = new TestProcedure(2, 1);
697    TestProcedure c = new TestProcedure(3, 1);
698
699    // INIT
700    procStore.insert(a, null);
701
702    // Run A first step
703    a.addStackId(0);
704    procStore.update(a);
705
706    // Run A second step
707    a.addStackId(1);
708    procStore.insert(a, new Procedure[] { b, c });
709
710    // Run B first step
711    b.addStackId(2);
712    procStore.update(b);
713
714    // Run C first and last step
715    c.addStackId(3);
716    procStore.update(c);
717
718    // Run B second setp
719    b.addStackId(4);
720    procStore.update(b);
721
722    // back to A
723    a.addStackId(5);
724    a.setSuccessState();
725    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
726    restartAndAssert(3, 0, 1, 0);
727  }
728
729  @Test
730  public void testBatchDelete() throws Exception {
731    for (int i = 1; i < 10; ++i) {
732      procStore.insert(new TestProcedure(i), null);
733    }
734
735    // delete nothing
736    long[] toDelete = new long[] { 1, 2, 3, 4 };
737    procStore.delete(toDelete, 2, 0);
738    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
739    for (int i = 1; i < 10; ++i) {
740      assertEquals(true, loader.isRunnable(i));
741    }
742
743    // delete the full "toDelete" array (2, 4, 6, 8)
744    toDelete = new long[] { 2, 4, 6, 8 };
745    procStore.delete(toDelete, 0, toDelete.length);
746    loader = restartAndAssert(9, 5, 0, 0);
747    for (int i = 1; i < 10; ++i) {
748      assertEquals(i % 2 != 0, loader.isRunnable(i));
749    }
750
751    // delete a slice of "toDelete" (1, 3)
752    toDelete = new long[] { 5, 7, 1, 3, 9 };
753    procStore.delete(toDelete, 2, 2);
754    loader = restartAndAssert(9, 3, 0, 0);
755    for (int i = 1; i < 10; ++i) {
756      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
757    }
758
759    // delete a single item (5)
760    toDelete = new long[] { 5 };
761    procStore.delete(toDelete, 0, 1);
762    loader = restartAndAssert(9, 2, 0, 0);
763    for (int i = 1; i < 10; ++i) {
764      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
765    }
766
767    // delete remaining using a slice of "toDelete" (7, 9)
768    toDelete = new long[] { 0, 7, 9 };
769    procStore.delete(toDelete, 1, 2);
770    loader = restartAndAssert(0, 0, 0, 0);
771    for (int i = 1; i < 10; ++i) {
772      assertEquals(false, loader.isRunnable(i));
773    }
774  }
775
776  @Test
777  public void testBatchInsert() throws Exception {
778    final int count = 10;
779    final TestProcedure[] procs = new TestProcedure[count];
780    for (int i = 0; i < procs.length; ++i) {
781      procs[i] = new TestProcedure(i + 1);
782    }
783    procStore.insert(procs);
784    restartAndAssert(count, count, 0, 0);
785
786    for (int i = 0; i < procs.length; ++i) {
787      final long procId = procs[i].getProcId();
788      procStore.delete(procId);
789      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
790    }
791    procStore.removeInactiveLogsForTesting();
792    assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
793  }
794
795  @Test
796  public void testWALDirAndWALArchiveDir() throws IOException {
797    Configuration conf = htu.getConfiguration();
798    procStore = createWALProcedureStore(conf);
799    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
800  }
801
802  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
803    return new WALProcedureStore(conf, new LeaseRecovery() {
804      @Override
805      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
806        // no-op
807      }
808    });
809  }
810
811  private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount,
812    int corruptedCount) throws Exception {
813    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount,
814      completedCount, corruptedCount);
815  }
816
817  private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException {
818    assertTrue(logFile.getLen() > dropBytes);
819    LOG.debug(
820      "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes);
821    Path tmpPath = new Path(testDir, "corrupted.log");
822    InputStream in = fs.open(logFile.getPath());
823    OutputStream out = fs.create(tmpPath);
824    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
825    if (!fs.rename(tmpPath, logFile.getPath())) {
826      throw new IOException("Unable to rename");
827    }
828  }
829
830  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
831    LOG.debug("expected: " + procIds);
832    LoadCounter loader = new LoadCounter();
833    storeRestart(loader);
834    assertEquals(procIds.size(), loader.getLoadedCount());
835    assertEquals(0, loader.getCorruptedCount());
836  }
837
838  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
839
840    private static final AtomicLong seqId = new AtomicLong(0);
841
842    public TestSequentialProcedure() {
843      setProcId(seqId.incrementAndGet());
844    }
845
846    @Override
847    protected Procedure<Void>[] execute(Void env) {
848      return null;
849    }
850
851    @Override
852    protected void rollback(Void env) {
853    }
854
855    @Override
856    protected boolean abort(Void env) {
857      return false;
858    }
859
860    @Override
861    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
862      long procId = getProcId();
863      if (procId % 2 == 0) {
864        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
865        serializer.serialize(builder.build());
866      }
867    }
868
869    @Override
870    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
871      long procId = getProcId();
872      if (procId % 2 == 0) {
873        Int64Value value = serializer.deserialize(Int64Value.class);
874        assertEquals(procId, value.getValue());
875      }
876    }
877  }
878}