001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.LinkedHashMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicLong; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.AtomicUtils; 033import org.junit.After; 034import org.junit.Before; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 040 041/** 042 * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to 043 * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading, 044 * and finally fail the start up of master. 045 */ 046@Category({ MasterTests.class, SmallTests.class }) 047public class TestStackIdHoles { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestStackIdHoles.class); 052 053 private final class DummyProcedureStore extends ProcedureStoreBase { 054 055 private int numThreads; 056 057 private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap = 058 new LinkedHashMap<Long, ProcedureProtos.Procedure>(); 059 060 private final AtomicLong maxProcId = new AtomicLong(0); 061 062 private final AtomicBoolean updated = new AtomicBoolean(false); 063 064 @Override 065 public void start(int numThreads) throws IOException { 066 this.numThreads = numThreads; 067 setRunning(true); 068 } 069 070 @Override 071 public void stop(boolean abort) { 072 } 073 074 @Override 075 public int getNumThreads() { 076 return numThreads; 077 } 078 079 @Override 080 public int setRunningProcedureCount(int count) { 081 return count; 082 } 083 084 @Override 085 public void recoverLease() throws IOException { 086 } 087 088 @Override 089 public void load(ProcedureLoader loader) throws IOException { 090 loader.setMaxProcId(maxProcId.get()); 091 ProcedureTree tree = ProcedureTree.build(procMap.values()); 092 loader.load(tree.getValidProcs()); 093 loader.handleCorrupted(tree.getCorruptedProcs()); 094 } 095 096 @Override 097 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 098 long max = proc.getProcId(); 099 synchronized (procMap) { 100 try { 101 procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); 102 if (subprocs != null) { 103 for (Procedure<?> p : subprocs) { 104 procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); 105 max = Math.max(max, p.getProcId()); 106 } 107 } 108 } catch (IOException e) { 109 throw new UncheckedIOException(e); 110 } 111 } 112 AtomicUtils.updateMax(maxProcId, max); 113 } 114 115 @Override 116 public void insert(Procedure<?>[] procs) { 117 long max = -1; 118 synchronized (procMap) { 119 try { 120 for (Procedure<?> p : procs) { 121 procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); 122 max = Math.max(max, p.getProcId()); 123 } 124 } catch (IOException e) { 125 throw new UncheckedIOException(e); 126 } 127 } 128 AtomicUtils.updateMax(maxProcId, max); 129 } 130 131 @Override 132 public void update(Procedure<?> proc) { 133 // inject a sleep to simulate the scenario in HBASE-28210 134 if (proc.hasParent() && proc.getStackIndexes() != null) { 135 int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1]; 136 try { 137 // sleep more times if the stack id is smaller 138 Thread.sleep(100L * (10 - lastStackId)); 139 } catch (InterruptedException e) { 140 Thread.currentThread().interrupt(); 141 return; 142 } 143 // simulate the failure when updating the second sub procedure 144 if (!updated.compareAndSet(false, true)) { 145 procExec.stop(); 146 throw new RuntimeException("inject error"); 147 } 148 } 149 synchronized (procMap) { 150 try { 151 procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); 152 } catch (IOException e) { 153 throw new UncheckedIOException(e); 154 } 155 } 156 } 157 158 @Override 159 public void delete(long procId) { 160 synchronized (procMap) { 161 procMap.remove(procId); 162 } 163 } 164 165 @Override 166 public void delete(Procedure<?> parentProc, long[] subProcIds) { 167 synchronized (procMap) { 168 try { 169 procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc)); 170 for (long procId : subProcIds) { 171 procMap.remove(procId); 172 } 173 } catch (IOException e) { 174 throw new UncheckedIOException(e); 175 } 176 } 177 } 178 179 @Override 180 public void delete(long[] procIds, int offset, int count) { 181 synchronized (procMap) { 182 for (int i = 0; i < count; i++) { 183 long procId = procIds[offset + i]; 184 procMap.remove(procId); 185 } 186 } 187 } 188 } 189 190 private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil(); 191 192 private DummyProcedureStore procStore; 193 194 private ProcedureExecutor<Void> procExec; 195 196 @Before 197 public void setUp() throws IOException { 198 procStore = new DummyProcedureStore(); 199 procStore.start(4); 200 procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); 201 procExec.init(4, true); 202 procExec.startWorkers(); 203 } 204 205 @After 206 public void tearDown() { 207 procExec.stop(); 208 } 209 210 public static class DummyProcedure extends NoopProcedure<Void> { 211 212 @Override 213 protected Procedure<Void>[] execute(Void env) 214 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 215 return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() }; 216 } 217 } 218 219 @Test 220 public void testLoad() throws IOException { 221 procExec.submitProcedure(new DummyProcedure()); 222 // wait for the error 223 HBTU.waitFor(30000, () -> !procExec.isRunning()); 224 procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); 225 // make sure there is no error while loading 226 procExec.init(4, true); 227 } 228}