001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.ArgumentMatchers.any; 021import static org.mockito.ArgumentMatchers.anyString; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.never; 024import static org.mockito.Mockito.spy; 025import static org.mockito.Mockito.verify; 026import static org.mockito.Mockito.when; 027 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.errorhandling.ForeignException; 033import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041/** 042 * Demonstrate how Procedure handles single members, multiple members, and errors semantics 043 */ 044@Category({ MasterTests.class, SmallTests.class }) 045public class TestProcedure { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedure.class); 050 051 ProcedureCoordinator coord; 052 053 @Before 054 public void setup() { 055 coord = mock(ProcedureCoordinator.class); 056 final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class); 057 when(coord.getRpcs()).thenReturn(comms); // make it not null 058 } 059 060 static class LatchedProcedure extends Procedure { 061 CountDownLatch startedAcquireBarrier = new CountDownLatch(1); 062 CountDownLatch startedDuringBarrier = new CountDownLatch(1); 063 CountDownLatch completedProcedure = new CountDownLatch(1); 064 065 public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, 066 long wakeFreq, long timeout, String opName, byte[] data, List<String> expectedMembers) { 067 super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers); 068 } 069 070 @Override 071 public void sendGlobalBarrierStart() { 072 startedAcquireBarrier.countDown(); 073 } 074 075 @Override 076 public void sendGlobalBarrierReached() { 077 startedDuringBarrier.countDown(); 078 } 079 080 @Override 081 public void sendGlobalBarrierComplete() { 082 completedProcedure.countDown(); 083 } 084 }; 085 086 /** 087 * With a single member, verify ordered execution. The Coordinator side is run in a separate 088 * thread so we can only trigger from members and wait for particular state latches. 089 */ 090 @Test 091 public void testSingleMember() throws Exception { 092 // The member 093 List<String> members = new ArrayList<>(); 094 members.add("member"); 095 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 096 Integer.MAX_VALUE, "op", null, members); 097 final LatchedProcedure procspy = spy(proc); 098 // coordinator: start the barrier procedure 099 new Thread() { 100 @Override 101 public void run() { 102 procspy.call(); 103 } 104 }.start(); 105 106 // coordinator: wait for the barrier to be acquired, then send start barrier 107 proc.startedAcquireBarrier.await(); 108 109 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 110 verify(procspy).sendGlobalBarrierStart(); 111 verify(procspy, never()).sendGlobalBarrierReached(); 112 verify(procspy, never()).sendGlobalBarrierComplete(); 113 verify(procspy, never()).barrierAcquiredByMember(anyString()); 114 115 // member: trigger global barrier acquisition 116 proc.barrierAcquiredByMember(members.get(0)); 117 118 // coordinator: wait for global barrier to be acquired. 119 proc.acquiredBarrierLatch.await(); 120 verify(procspy).sendGlobalBarrierStart(); // old news 121 122 // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was 123 // or was not called here. 124 125 // member: trigger global barrier release 126 proc.barrierReleasedByMember(members.get(0), new byte[0]); 127 128 // coordinator: wait for procedure to be completed 129 proc.completedProcedure.await(); 130 verify(procspy).sendGlobalBarrierReached(); 131 verify(procspy).sendGlobalBarrierComplete(); 132 verify(procspy, never()).receive(any()); 133 } 134 135 @Test 136 public void testMultipleMember() throws Exception { 137 // 2 members 138 List<String> members = new ArrayList<>(); 139 members.add("member1"); 140 members.add("member2"); 141 142 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 143 Integer.MAX_VALUE, "op", null, members); 144 final LatchedProcedure procspy = spy(proc); 145 // start the barrier procedure 146 new Thread() { 147 @Override 148 public void run() { 149 procspy.call(); 150 } 151 }.start(); 152 153 // coordinator: wait for the barrier to be acquired, then send start barrier 154 procspy.startedAcquireBarrier.await(); 155 156 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 157 verify(procspy).sendGlobalBarrierStart(); 158 verify(procspy, never()).sendGlobalBarrierReached(); 159 verify(procspy, never()).sendGlobalBarrierComplete(); 160 verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals 161 162 // member0: [1/2] trigger global barrier acquisition. 163 procspy.barrierAcquiredByMember(members.get(0)); 164 165 // coordinator not satisified. 166 verify(procspy).sendGlobalBarrierStart(); 167 verify(procspy, never()).sendGlobalBarrierReached(); 168 verify(procspy, never()).sendGlobalBarrierComplete(); 169 170 // member 1: [2/2] trigger global barrier acquisition. 171 procspy.barrierAcquiredByMember(members.get(1)); 172 173 // coordinator: wait for global barrier to be acquired. 174 procspy.startedDuringBarrier.await(); 175 verify(procspy).sendGlobalBarrierStart(); // old news 176 177 // member 1, 2: trigger global barrier release 178 procspy.barrierReleasedByMember(members.get(0), new byte[0]); 179 procspy.barrierReleasedByMember(members.get(1), new byte[0]); 180 181 // coordinator wait for procedure to be completed 182 procspy.completedProcedure.await(); 183 verify(procspy).sendGlobalBarrierReached(); 184 verify(procspy).sendGlobalBarrierComplete(); 185 verify(procspy, never()).receive(any()); 186 } 187 188 @Test 189 public void testErrorPropagation() throws Exception { 190 List<String> members = new ArrayList<>(); 191 members.add("member"); 192 Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100, Integer.MAX_VALUE, 193 "op", null, members); 194 final Procedure procspy = spy(proc); 195 196 ForeignException cause = new ForeignException("SRC", "External Exception"); 197 proc.receive(cause); 198 199 // start the barrier procedure 200 Thread t = new Thread() { 201 @Override 202 public void run() { 203 procspy.call(); 204 } 205 }; 206 t.start(); 207 t.join(); 208 209 verify(procspy, never()).sendGlobalBarrierStart(); 210 verify(procspy, never()).sendGlobalBarrierReached(); 211 verify(procspy).sendGlobalBarrierComplete(); 212 } 213 214 @Test 215 public void testBarrieredErrorPropagation() throws Exception { 216 List<String> members = new ArrayList<>(); 217 members.add("member"); 218 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 219 Integer.MAX_VALUE, "op", null, members); 220 final LatchedProcedure procspy = spy(proc); 221 222 // start the barrier procedure 223 Thread t = new Thread() { 224 @Override 225 public void run() { 226 procspy.call(); 227 } 228 }; 229 t.start(); 230 231 // now test that we can put an error in before the commit phase runs 232 procspy.startedAcquireBarrier.await(); 233 ForeignException cause = new ForeignException("SRC", "External Exception"); 234 procspy.receive(cause); 235 procspy.barrierAcquiredByMember(members.get(0)); 236 t.join(); 237 238 // verify state of all the object 239 verify(procspy).sendGlobalBarrierStart(); 240 verify(procspy).sendGlobalBarrierComplete(); 241 verify(procspy, never()).sendGlobalBarrierReached(); 242 } 243}