001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.concurrent.Semaphore; 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.apache.hadoop.hbase.util.Threads; 032import org.junit.After; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@Category({ MasterTests.class, SmallTests.class }) 041public class TestProcedureExecutor { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestProcedureExecutor.class); 046 047 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecutor.class); 048 049 private TestProcEnv procEnv; 050 private NoopProcedureStore procStore; 051 private ProcedureExecutor<TestProcEnv> procExecutor; 052 053 private HBaseCommonTestingUtil htu; 054 055 @Before 056 public void setUp() throws Exception { 057 htu = new HBaseCommonTestingUtil(); 058 059 // NOTE: The executor will be created by each test 060 procEnv = new TestProcEnv(); 061 procStore = new NoopProcedureStore(); 062 procStore.start(1); 063 } 064 065 @After 066 public void tearDown() throws Exception { 067 procExecutor.stop(); 068 procStore.stop(false); 069 procExecutor.join(); 070 } 071 072 private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { 073 procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore); 074 ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true); 075 } 076 077 @Test 078 public void testWorkerStuck() throws Exception { 079 // replace the executor 080 final Configuration conf = new Configuration(htu.getConfiguration()); 081 conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f); 082 conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500); 083 conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750); 084 085 final int NUM_THREADS = 2; 086 createNewExecutor(conf, NUM_THREADS); 087 088 Semaphore latch1 = new Semaphore(2); 089 latch1.acquire(2); 090 BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1); 091 092 Semaphore latch2 = new Semaphore(2); 093 latch2.acquire(2); 094 BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2); 095 096 long busyProcId1 = procExecutor.submitProcedure(busyProc1); 097 long busyProcId2 = procExecutor.submitProcedure(busyProc2); 098 long otherProcId = procExecutor.submitProcedure(new NoopProcedure()); 099 100 // wait until a new worker is being created 101 int threads1 = waitThreadCount(NUM_THREADS + 1); 102 LOG.info("new threads got created: " + (threads1 - NUM_THREADS)); 103 assertEquals(NUM_THREADS + 1, threads1); 104 105 ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId); 106 assertEquals(true, procExecutor.isFinished(otherProcId)); 107 ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId); 108 109 assertEquals(true, procExecutor.isRunning()); 110 assertEquals(false, procExecutor.isFinished(busyProcId1)); 111 assertEquals(false, procExecutor.isFinished(busyProcId2)); 112 113 // terminate the busy procedures 114 latch1.release(); 115 latch2.release(); 116 117 LOG.info("set keep alive and wait threads being removed"); 118 procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS); 119 int threads2 = waitThreadCount(NUM_THREADS); 120 LOG.info("threads got removed: " + (threads1 - threads2)); 121 assertEquals(NUM_THREADS, threads2); 122 123 // terminate the busy procedures 124 latch1.release(); 125 latch2.release(); 126 127 // wait for all procs to complete 128 ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1); 129 ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2); 130 ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1); 131 ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2); 132 } 133 134 @Test 135 public void testSubmitBatch() throws Exception { 136 Procedure[] procs = new Procedure[5]; 137 for (int i = 0; i < procs.length; ++i) { 138 procs[i] = new NoopProcedure<TestProcEnv>(); 139 } 140 141 // submit procedures 142 createNewExecutor(htu.getConfiguration(), 3); 143 procExecutor.submitProcedures(procs); 144 145 // wait for procs to be completed 146 for (int i = 0; i < procs.length; ++i) { 147 final long procId = procs[i].getProcId(); 148 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 149 ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); 150 } 151 } 152 153 private int waitThreadCount(final int expectedThreads) { 154 while (procExecutor.isRunning()) { 155 if (procExecutor.getWorkerThreadCount() == expectedThreads) { 156 break; 157 } 158 LOG.debug("waiting for thread count=" + expectedThreads + " current=" 159 + procExecutor.getWorkerThreadCount()); 160 Threads.sleepWithoutInterrupt(250); 161 } 162 return procExecutor.getWorkerThreadCount(); 163 } 164 165 public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> { 166 private final Semaphore latch; 167 168 public BusyWaitProcedure(final Semaphore latch) { 169 this.latch = latch; 170 } 171 172 @Override 173 protected Procedure[] execute(final TestProcEnv env) { 174 try { 175 LOG.info("worker started " + this); 176 if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { 177 throw new Exception("waited too long"); 178 } 179 180 LOG.info("worker step 2 " + this); 181 if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { 182 throw new Exception("waited too long"); 183 } 184 } catch (Exception e) { 185 LOG.error("got unexpected exception", e); 186 setFailure("BusyWaitProcedure", e); 187 } 188 return null; 189 } 190 } 191 192 private static class TestProcEnv { 193 } 194}