001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.Exchanger; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Rule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.rules.TestName; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 046 047@Category({ MasterTests.class, SmallTests.class }) 048public class TestProcedureCleanup { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestProcedureCleanup.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class); 055 056 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 057 058 private static WALProcedureStore procStore; 059 060 private static ProcedureExecutor<Void> procExecutor; 061 062 private static HBaseCommonTestingUtil htu; 063 064 private static FileSystem fs; 065 private static Path testDir; 066 private static Path logDir; 067 068 @Rule 069 public final TestName name = new TestName(); 070 071 private void createProcExecutor() throws Exception { 072 logDir = new Path(testDir, name.getMethodName()); 073 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 074 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 075 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 076 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true); 077 } 078 079 @BeforeClass 080 public static void setUp() throws Exception { 081 htu = new HBaseCommonTestingUtil(); 082 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 083 // NOTE: The executor will be created by each test 084 testDir = htu.getDataTestDir(); 085 fs = testDir.getFileSystem(htu.getConfiguration()); 086 assertTrue(testDir.depth() > 1); 087 } 088 089 @Test 090 public void testProcedureShouldNotCleanOnLoad() throws Exception { 091 createProcExecutor(); 092 final RootProcedure proc = new RootProcedure(); 093 long rootProc = procExecutor.submitProcedure(proc); 094 LOG.info("Begin to execute " + rootProc); 095 // wait until the child procedure arrival 096 htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2); 097 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().get(1); 098 // wait until the suspendProcedure executed 099 suspendProcedure.latch.countDown(); 100 Thread.sleep(100); 101 // roll the procedure log 102 LOG.info("Begin to roll log "); 103 procStore.rollWriterForTesting(); 104 LOG.info("finish to roll log "); 105 Thread.sleep(500); 106 LOG.info("begin to restart1 "); 107 ProcedureTestingUtility.restart(procExecutor, true); 108 LOG.info("finish to restart1 "); 109 assertTrue(procExecutor.getProcedure(rootProc) != null); 110 Thread.sleep(500); 111 LOG.info("begin to restart2 "); 112 ProcedureTestingUtility.restart(procExecutor, true); 113 LOG.info("finish to restart2 "); 114 assertTrue(procExecutor.getProcedure(rootProc) != null); 115 } 116 117 @Test 118 public void testProcedureUpdatedShouldClean() throws Exception { 119 createProcExecutor(); 120 SuspendProcedure suspendProcedure = new SuspendProcedure(); 121 long suspendProc = procExecutor.submitProcedure(suspendProcedure); 122 LOG.info("Begin to execute " + suspendProc); 123 suspendProcedure.latch.countDown(); 124 Thread.sleep(500); 125 LOG.info("begin to restart1 "); 126 ProcedureTestingUtility.restart(procExecutor, true); 127 LOG.info("finish to restart1 "); 128 htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null); 129 // Wait until the suspendProc executed after restart 130 suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc); 131 suspendProcedure.latch.countDown(); 132 Thread.sleep(500); 133 // Should be 1 log since the suspendProcedure is updated in the new log 134 assertTrue(procStore.getActiveLogs().size() == 1); 135 // restart procExecutor 136 LOG.info("begin to restart2"); 137 // Restart the executor but do not start the workers. 138 // Otherwise, the suspendProcedure will soon be executed and the oldest log 139 // will be cleaned, leaving only the newest log. 140 ProcedureTestingUtility.restart(procExecutor, true, false); 141 LOG.info("finish to restart2"); 142 // There should be two active logs 143 assertTrue(procStore.getActiveLogs().size() == 2); 144 procExecutor.startWorkers(); 145 146 } 147 148 @Test 149 public void testProcedureDeletedShouldClean() throws Exception { 150 createProcExecutor(); 151 WaitProcedure waitProcedure = new WaitProcedure(); 152 long waitProce = procExecutor.submitProcedure(waitProcedure); 153 LOG.info("Begin to execute " + waitProce); 154 Thread.sleep(500); 155 LOG.info("begin to restart1 "); 156 ProcedureTestingUtility.restart(procExecutor, true); 157 LOG.info("finish to restart1 "); 158 htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null); 159 // Wait until the suspendProc executed after restart 160 waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce); 161 waitProcedure.latch.countDown(); 162 Thread.sleep(500); 163 // Should be 1 log since the suspendProcedure is updated in the new log 164 assertTrue(procStore.getActiveLogs().size() == 1); 165 // restart procExecutor 166 LOG.info("begin to restart2"); 167 // Restart the executor but do not start the workers. 168 // Otherwise, the suspendProcedure will soon be executed and the oldest log 169 // will be cleaned, leaving only the newest log. 170 ProcedureTestingUtility.restart(procExecutor, true, false); 171 LOG.info("finish to restart2"); 172 // There should be two active logs 173 assertTrue(procStore.getActiveLogs().size() == 2); 174 procExecutor.startWorkers(); 175 } 176 177 private void corrupt(FileStatus file) throws IOException { 178 LOG.info("Corrupt " + file); 179 Path tmpFile = file.getPath().suffix(".tmp"); 180 // remove the last byte to make the trailer corrupted 181 try (FSDataInputStream in = fs.open(file.getPath()); 182 FSDataOutputStream out = fs.create(tmpFile)) { 183 ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out); 184 } 185 fs.delete(file.getPath(), false); 186 fs.rename(tmpFile, file.getPath()); 187 } 188 189 public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 190 191 private final Exchanger<Boolean> exchanger = new Exchanger<>(); 192 193 @SuppressWarnings("unchecked") 194 @Override 195 protected Procedure<Void>[] execute(Void env) 196 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 197 if (exchanger.exchange(Boolean.TRUE)) { 198 return new Procedure[] { this }; 199 } else { 200 return null; 201 } 202 } 203 } 204 205 @Test 206 public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception { 207 createProcExecutor(); 208 ExchangeProcedure proc1 = new ExchangeProcedure(); 209 ExchangeProcedure proc2 = new ExchangeProcedure(); 210 procExecutor.submitProcedure(proc1); 211 long procId2 = procExecutor.submitProcedure(proc2); 212 Thread.sleep(500); 213 procStore.rollWriterForTesting(); 214 proc1.exchanger.exchange(Boolean.TRUE); 215 Thread.sleep(500); 216 217 FileStatus[] walFiles = fs.listStatus(logDir); 218 Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName())); 219 // corrupt the first proc wal file, so we will have a partial tracker for it after restarting 220 corrupt(walFiles[0]); 221 ProcedureTestingUtility.restart(procExecutor, false, true); 222 // also update proc2, which means that all the procedures in the first proc wal have been 223 // updated and it should be deleted. 224 proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2); 225 proc2.exchanger.exchange(Boolean.TRUE); 226 htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath())); 227 } 228 229 public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 230 public WaitProcedure() { 231 super(); 232 } 233 234 private CountDownLatch latch = new CountDownLatch(1); 235 236 @Override 237 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 238 // Always wait here 239 LOG.info("wait here"); 240 try { 241 latch.await(); 242 } catch (Throwable t) { 243 244 } 245 LOG.info("finished"); 246 return null; 247 } 248 } 249 250 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 251 public SuspendProcedure() { 252 super(); 253 } 254 255 private CountDownLatch latch = new CountDownLatch(1); 256 257 @Override 258 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 259 // Always suspend the procedure 260 LOG.info("suspend here"); 261 latch.countDown(); 262 throw new ProcedureSuspendedException(); 263 } 264 } 265 266 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> { 267 private boolean childSpwaned = false; 268 269 public RootProcedure() { 270 super(); 271 } 272 273 @Override 274 protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException { 275 if (!childSpwaned) { 276 childSpwaned = true; 277 return new Procedure[] { new SuspendProcedure() }; 278 } else { 279 return null; 280 } 281 } 282 } 283}