001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.executor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyString; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.times; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.io.StringWriter; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.Waiter.Predicate; 042import org.apache.hadoop.hbase.executor.ExecutorService.Executor; 043import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 044import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; 045import org.apache.hadoop.hbase.testclassification.MiscTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.junit.Assert; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054@Category({ MiscTests.class, SmallTests.class }) 055public class TestExecutorService { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestExecutorService.class); 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestExecutorService.class); 062 063 @Test 064 public void testExecutorService() throws Exception { 065 int maxThreads = 5; 066 int maxTries = 10; 067 int sleepInterval = 10; 068 069 Server mockedServer = mock(Server.class); 070 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create()); 071 072 // Start an executor service pool with max 5 threads 073 ExecutorService executorService = new ExecutorService("unit_test"); 074 executorService.startExecutorService(executorService.new ExecutorConfig() 075 .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads)); 076 077 Executor executor = executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS); 078 ThreadPoolExecutor pool = executor.threadPoolExecutor; 079 080 // Assert no threads yet 081 assertEquals(0, pool.getPoolSize()); 082 083 AtomicBoolean lock = new AtomicBoolean(true); 084 AtomicInteger counter = new AtomicInteger(0); 085 086 // Submit maxThreads executors. 087 for (int i = 0; i < maxThreads; i++) { 088 executorService 089 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 090 } 091 092 // The TestEventHandler will increment counter when it starts. 093 int tries = 0; 094 while (counter.get() < maxThreads && tries < maxTries) { 095 LOG.info("Waiting for all event handlers to start..."); 096 Thread.sleep(sleepInterval); 097 tries++; 098 } 099 100 // Assert that pool is at max threads. 101 assertEquals(maxThreads, counter.get()); 102 assertEquals(maxThreads, pool.getPoolSize()); 103 104 ExecutorStatus status = executor.getStatus(); 105 assertTrue(status.queuedEvents.isEmpty()); 106 assertEquals(5, status.running.size()); 107 checkStatusDump(status); 108 109 // Now interrupt the running Executor 110 synchronized (lock) { 111 lock.set(false); 112 lock.notifyAll(); 113 } 114 115 // Executor increments counter again on way out so.... test that happened. 116 while (counter.get() < (maxThreads * 2) && tries < maxTries) { 117 System.out.println("Waiting for all event handlers to finish..."); 118 Thread.sleep(sleepInterval); 119 tries++; 120 } 121 122 assertEquals(maxThreads * 2, counter.get()); 123 assertEquals(maxThreads, pool.getPoolSize()); 124 125 // Add more than the number of threads items. 126 // Make sure we don't get RejectedExecutionException. 127 for (int i = 0; i < (2 * maxThreads); i++) { 128 executorService 129 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 130 } 131 // Now interrupt the running Executor 132 synchronized (lock) { 133 lock.set(false); 134 lock.notifyAll(); 135 } 136 137 // Make sure threads are still around even after their timetolive expires. 138 Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2); 139 assertEquals(maxThreads, pool.getPoolSize()); 140 141 executorService.shutdown(); 142 143 assertEquals(0, executorService.getAllExecutorStatuses().size()); 144 145 // Test that submit doesn't throw NPEs 146 executorService 147 .submit(new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN, lock, counter)); 148 } 149 150 private void checkStatusDump(ExecutorStatus status) throws IOException { 151 StringWriter sw = new StringWriter(); 152 status.dumpTo(sw, ""); 153 String dump = sw.toString(); 154 LOG.info("Got status dump:\n" + dump); 155 156 assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); 157 } 158 159 public static class TestEventHandler extends EventHandler { 160 private final AtomicBoolean lock; 161 private AtomicInteger counter; 162 163 public TestEventHandler(Server server, EventType eventType, AtomicBoolean lock, 164 AtomicInteger counter) { 165 super(server, eventType); 166 this.lock = lock; 167 this.counter = counter; 168 } 169 170 @Override 171 public void process() throws IOException { 172 int num = counter.incrementAndGet(); 173 LOG.info("Running process #" + num + ", threadName=" + Thread.currentThread().getName()); 174 synchronized (lock) { 175 while (lock.get()) { 176 try { 177 lock.wait(); 178 } catch (InterruptedException e) { 179 // do nothing 180 } 181 } 182 } 183 counter.incrementAndGet(); 184 } 185 } 186 187 @Test 188 public void testAborting() throws Exception { 189 final Configuration conf = HBaseConfiguration.create(); 190 final Server server = mock(Server.class); 191 when(server.getConfiguration()).thenReturn(conf); 192 193 ExecutorService executorService = new ExecutorService("unit_test"); 194 executorService.startExecutorService(executorService.new ExecutorConfig() 195 .setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1)); 196 197 executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) { 198 @Override 199 public void process() throws IOException { 200 throw new RuntimeException("Should cause abort"); 201 } 202 }); 203 204 Waiter.waitFor(conf, 30000, new Predicate<Exception>() { 205 @Override 206 public boolean evaluate() throws Exception { 207 try { 208 verify(server, times(1)).abort(anyString(), any()); 209 return true; 210 } catch (Throwable t) { 211 return false; 212 } 213 } 214 }); 215 216 executorService.shutdown(); 217 } 218 219 @Test 220 public void testSnapshotHandlers() throws Exception { 221 final Configuration conf = HBaseConfiguration.create(); 222 final Server server = mock(Server.class); 223 when(server.getConfiguration()).thenReturn(conf); 224 225 ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); 226 executorService.startExecutorService(executorService.new ExecutorConfig() 227 .setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1)); 228 229 CountDownLatch latch = new CountDownLatch(1); 230 CountDownLatch waitForEventToStart = new CountDownLatch(1); 231 executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { 232 @Override 233 public void process() throws IOException { 234 waitForEventToStart.countDown(); 235 try { 236 latch.await(); 237 } catch (InterruptedException e) { 238 Thread.currentThread().interrupt(); 239 } 240 } 241 }); 242 243 // Wait EventHandler to start 244 waitForEventToStart.await(10, TimeUnit.SECONDS); 245 int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 246 .getThreadPoolExecutor().getActiveCount(); 247 Assert.assertEquals(1, activeCount); 248 latch.countDown(); 249 Waiter.waitFor(conf, 3000, () -> { 250 int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 251 .getThreadPoolExecutor().getActiveCount(); 252 return count == 0; 253 }); 254 } 255}