001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.Exchanger;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Rule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.junit.rules.TestName;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
046
047@Category({ MasterTests.class, SmallTests.class })
048public class TestProcedureCleanup {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestProcedureCleanup.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
055
056  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
057
058  private static WALProcedureStore procStore;
059
060  private static ProcedureExecutor<Void> procExecutor;
061
062  private static HBaseCommonTestingUtil htu;
063
064  private static FileSystem fs;
065  private static Path testDir;
066  private static Path logDir;
067
068  @Rule
069  public final TestName name = new TestName();
070
071  private void createProcExecutor() throws Exception {
072    logDir = new Path(testDir, name.getMethodName());
073    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
074    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
077  }
078
079  @BeforeClass
080  public static void setUp() throws Exception {
081    htu = new HBaseCommonTestingUtil();
082    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
083    // NOTE: The executor will be created by each test
084    testDir = htu.getDataTestDir();
085    fs = testDir.getFileSystem(htu.getConfiguration());
086    assertTrue(testDir.depth() > 1);
087  }
088
089  @Test
090  public void testProcedureShouldNotCleanOnLoad() throws Exception {
091    createProcExecutor();
092    final RootProcedure proc = new RootProcedure();
093    long rootProc = procExecutor.submitProcedure(proc);
094    LOG.info("Begin to execute " + rootProc);
095    // wait until the child procedure arrival
096    htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2);
097    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().get(1);
098    // wait until the suspendProcedure executed
099    suspendProcedure.latch.countDown();
100    Thread.sleep(100);
101    // roll the procedure log
102    LOG.info("Begin to roll log ");
103    procStore.rollWriterForTesting();
104    LOG.info("finish to roll log ");
105    Thread.sleep(500);
106    LOG.info("begin to restart1 ");
107    ProcedureTestingUtility.restart(procExecutor, true);
108    LOG.info("finish to restart1 ");
109    assertTrue(procExecutor.getProcedure(rootProc) != null);
110    Thread.sleep(500);
111    LOG.info("begin to restart2 ");
112    ProcedureTestingUtility.restart(procExecutor, true);
113    LOG.info("finish to restart2 ");
114    assertTrue(procExecutor.getProcedure(rootProc) != null);
115  }
116
117  @Test
118  public void testProcedureUpdatedShouldClean() throws Exception {
119    createProcExecutor();
120    SuspendProcedure suspendProcedure = new SuspendProcedure();
121    long suspendProc = procExecutor.submitProcedure(suspendProcedure);
122    LOG.info("Begin to execute " + suspendProc);
123    suspendProcedure.latch.countDown();
124    Thread.sleep(500);
125    LOG.info("begin to restart1 ");
126    ProcedureTestingUtility.restart(procExecutor, true);
127    LOG.info("finish to restart1 ");
128    htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null);
129    // Wait until the suspendProc executed after restart
130    suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
131    suspendProcedure.latch.countDown();
132    Thread.sleep(500);
133    // Should be 1 log since the suspendProcedure is updated in the new log
134    assertTrue(procStore.getActiveLogs().size() == 1);
135    // restart procExecutor
136    LOG.info("begin to restart2");
137    // Restart the executor but do not start the workers.
138    // Otherwise, the suspendProcedure will soon be executed and the oldest log
139    // will be cleaned, leaving only the newest log.
140    ProcedureTestingUtility.restart(procExecutor, true, false);
141    LOG.info("finish to restart2");
142    // There should be two active logs
143    assertTrue(procStore.getActiveLogs().size() == 2);
144    procExecutor.startWorkers();
145
146  }
147
148  @Test
149  public void testProcedureDeletedShouldClean() throws Exception {
150    createProcExecutor();
151    WaitProcedure waitProcedure = new WaitProcedure();
152    long waitProce = procExecutor.submitProcedure(waitProcedure);
153    LOG.info("Begin to execute " + waitProce);
154    Thread.sleep(500);
155    LOG.info("begin to restart1 ");
156    ProcedureTestingUtility.restart(procExecutor, true);
157    LOG.info("finish to restart1 ");
158    htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null);
159    // Wait until the suspendProc executed after restart
160    waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
161    waitProcedure.latch.countDown();
162    Thread.sleep(500);
163    // Should be 1 log since the suspendProcedure is updated in the new log
164    assertTrue(procStore.getActiveLogs().size() == 1);
165    // restart procExecutor
166    LOG.info("begin to restart2");
167    // Restart the executor but do not start the workers.
168    // Otherwise, the suspendProcedure will soon be executed and the oldest log
169    // will be cleaned, leaving only the newest log.
170    ProcedureTestingUtility.restart(procExecutor, true, false);
171    LOG.info("finish to restart2");
172    // There should be two active logs
173    assertTrue(procStore.getActiveLogs().size() == 2);
174    procExecutor.startWorkers();
175  }
176
177  private void corrupt(FileStatus file) throws IOException {
178    LOG.info("Corrupt " + file);
179    Path tmpFile = file.getPath().suffix(".tmp");
180    // remove the last byte to make the trailer corrupted
181    try (FSDataInputStream in = fs.open(file.getPath());
182      FSDataOutputStream out = fs.create(tmpFile)) {
183      ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out);
184    }
185    fs.delete(file.getPath(), false);
186    fs.rename(tmpFile, file.getPath());
187  }
188
189  public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
190
191    private final Exchanger<Boolean> exchanger = new Exchanger<>();
192
193    @SuppressWarnings("unchecked")
194    @Override
195    protected Procedure<Void>[] execute(Void env)
196      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
197      if (exchanger.exchange(Boolean.TRUE)) {
198        return new Procedure[] { this };
199      } else {
200        return null;
201      }
202    }
203  }
204
205  @Test
206  public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception {
207    createProcExecutor();
208    ExchangeProcedure proc1 = new ExchangeProcedure();
209    ExchangeProcedure proc2 = new ExchangeProcedure();
210    procExecutor.submitProcedure(proc1);
211    long procId2 = procExecutor.submitProcedure(proc2);
212    Thread.sleep(500);
213    procStore.rollWriterForTesting();
214    proc1.exchanger.exchange(Boolean.TRUE);
215    Thread.sleep(500);
216
217    FileStatus[] walFiles = fs.listStatus(logDir);
218    Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName()));
219    // corrupt the first proc wal file, so we will have a partial tracker for it after restarting
220    corrupt(walFiles[0]);
221    ProcedureTestingUtility.restart(procExecutor, false, true);
222    // also update proc2, which means that all the procedures in the first proc wal have been
223    // updated and it should be deleted.
224    proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2);
225    proc2.exchanger.exchange(Boolean.TRUE);
226    htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath()));
227  }
228
229  public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
230    public WaitProcedure() {
231      super();
232    }
233
234    private CountDownLatch latch = new CountDownLatch(1);
235
236    @Override
237    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
238      // Always wait here
239      LOG.info("wait here");
240      try {
241        latch.await();
242      } catch (Throwable t) {
243
244      }
245      LOG.info("finished");
246      return null;
247    }
248  }
249
250  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
251    public SuspendProcedure() {
252      super();
253    }
254
255    private CountDownLatch latch = new CountDownLatch(1);
256
257    @Override
258    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
259      // Always suspend the procedure
260      LOG.info("suspend here");
261      latch.countDown();
262      throw new ProcedureSuspendedException();
263    }
264  }
265
266  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
267    private boolean childSpwaned = false;
268
269    public RootProcedure() {
270      super();
271    }
272
273    @Override
274    protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
275      if (!childSpwaned) {
276        childSpwaned = true;
277        return new Procedure[] { new SuspendProcedure() };
278      } else {
279        return null;
280      }
281    }
282  }
283}