001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.hamcrest.CoreMatchers.is; 022import static org.hamcrest.CoreMatchers.not; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.util.List; 030import java.util.Objects; 031import java.util.concurrent.atomic.LongAdder; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CoordinatedStateManager; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.SplitLogCounters; 039import org.apache.hadoop.hbase.SplitLogTask; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 042import org.apache.hadoop.hbase.executor.ExecutorService; 043import org.apache.hadoop.hbase.executor.ExecutorType; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.CancelableProgressable; 047import org.apache.hadoop.hbase.util.MockServer; 048import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 049import org.apache.hadoop.hbase.zookeeper.ZKUtil; 050import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 051import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 052import org.apache.zookeeper.CreateMode; 053import org.apache.zookeeper.ZooDefs.Ids; 054import org.junit.After; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062@Category({ RegionServerTests.class, MediumTests.class }) 063public class TestSplitLogWorker { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestSplitLogWorker.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 070 private static final int WAIT_TIME = 15000; 071 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 072 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 073 private DummyServer ds; 074 private ZKWatcher zkw; 075 private SplitLogWorker slw; 076 private ExecutorService executorService; 077 078 static class DummyServer extends MockServer { 079 private ZKWatcher zkw; 080 private Configuration conf; 081 private CoordinatedStateManager cm; 082 083 public DummyServer(ZKWatcher zkw, Configuration conf) { 084 this.zkw = zkw; 085 this.conf = conf; 086 cm = new ZkCoordinatedStateManager(this); 087 } 088 089 @Override 090 public Configuration getConfiguration() { 091 return conf; 092 } 093 094 @Override 095 public ZKWatcher getZooKeeper() { 096 return zkw; 097 } 098 099 @Override 100 public CoordinatedStateManager getCoordinatedStateManager() { 101 return cm; 102 } 103 } 104 105 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 106 throws Exception { 107 assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, 108 waitForCounterBoolean(ctr, oldval, newval, timems)); 109 } 110 111 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 112 long timems) throws Exception { 113 114 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 115 } 116 117 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 118 long timems, boolean failIfTimeout) throws Exception { 119 120 long timeWaited = 121 TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() { 122 @Override 123 public boolean evaluate() throws Exception { 124 return (ctr.sum() >= newval); 125 } 126 }); 127 128 if (timeWaited > 0) { 129 // when not timed out 130 assertEquals(newval, ctr.sum()); 131 } 132 return true; 133 } 134 135 @Before 136 public void setup() throws Exception { 137 TEST_UTIL.startMiniZKCluster(); 138 Configuration conf = TEST_UTIL.getConfiguration(); 139 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); 140 ds = new DummyServer(zkw, conf); 141 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 142 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 143 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 144 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 145 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 146 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 147 148 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 149 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 150 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 151 152 SplitLogCounters.resetCounters(); 153 executorService = new ExecutorService("TestSplitLogWorker"); 154 executorService.startExecutorService(executorService.new ExecutorConfig() 155 .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10)); 156 } 157 158 @After 159 public void teardown() throws Exception { 160 if (executorService != null) { 161 executorService.shutdown(); 162 } 163 TEST_UTIL.shutdownMiniZKCluster(); 164 } 165 166 SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() { 167 168 @Override 169 public Status exec(String name, CancelableProgressable p) { 170 while (true) { 171 try { 172 Thread.sleep(1000); 173 } catch (InterruptedException e) { 174 return Status.PREEMPTED; 175 } 176 if (!p.progress()) { 177 return Status.PREEMPTED; 178 } 179 } 180 } 181 182 }; 183 184 @Test 185 public void testAcquireTaskAtStartup() throws Exception { 186 LOG.info("testAcquireTaskAtStartup"); 187 SplitLogCounters.resetCounters(); 188 final String TATAS = "tatas"; 189 final ServerName RS = ServerName.valueOf("rs,1,1"); 190 RegionServerServices mockedRS = getRegionServer(RS); 191 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 192 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, 193 CreateMode.PERSISTENT); 194 195 SplitLogWorker slw = 196 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 197 slw.start(); 198 try { 199 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 200 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 201 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 202 assertTrue(slt.isOwned(RS)); 203 } finally { 204 stopSplitLogWorker(slw); 205 } 206 } 207 208 private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException { 209 if (slw != null) { 210 slw.stop(); 211 slw.worker.join(WAIT_TIME); 212 if (slw.worker.isAlive()) { 213 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 214 } 215 } 216 } 217 218 @Test 219 public void testRaceForTask() throws Exception { 220 LOG.info("testRaceForTask"); 221 SplitLogCounters.resetCounters(); 222 final String TRFT = "trft"; 223 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 224 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 225 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 226 new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, 227 CreateMode.PERSISTENT); 228 RegionServerServices mockedRS1 = getRegionServer(SVR1); 229 RegionServerServices mockedRS2 = getRegionServer(SVR2); 230 SplitLogWorker slw1 = 231 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 232 SplitLogWorker slw2 = 233 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 234 slw1.start(); 235 slw2.start(); 236 try { 237 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 238 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 239 // not it, that we fell through to the next counter in line and it was set. 240 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 241 WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 242 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 243 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 244 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 245 } finally { 246 stopSplitLogWorker(slw1); 247 stopSplitLogWorker(slw2); 248 } 249 } 250 251 @Test 252 public void testPreemptTask() throws Exception { 253 LOG.info("testPreemptTask"); 254 SplitLogCounters.resetCounters(); 255 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 256 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 257 RegionServerServices mockedRS = getRegionServer(SRV); 258 SplitLogWorker slw = 259 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 260 slw.start(); 261 try { 262 Thread.yield(); // let the worker start 263 Thread.sleep(1000); 264 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 265 266 // this time create a task node after starting the splitLogWorker 267 zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(), 268 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 269 270 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 271 assertEquals(1, slw.getTaskReadySeq()); 272 byte[] bytes = ZKUtil.getData(zkw, PATH); 273 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 274 assertTrue(slt.isOwned(SRV)); 275 slt = new SplitLogTask.Owned(MANAGER); 276 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 277 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 278 } finally { 279 stopSplitLogWorker(slw); 280 } 281 } 282 283 @Test 284 public void testMultipleTasks() throws Exception { 285 LOG.info("testMultipleTasks"); 286 SplitLogCounters.resetCounters(); 287 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 288 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 289 RegionServerServices mockedRS = getRegionServer(SRV); 290 SplitLogWorker slw = 291 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 292 slw.start(); 293 try { 294 Thread.yield(); // let the worker start 295 Thread.sleep(100); 296 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 297 298 SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); 299 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 300 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 301 302 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 303 // now the worker is busy doing the above task 304 305 // create another task 306 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 307 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 308 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 309 310 // preempt the first task, have it owned by another worker 311 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 312 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 313 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 314 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 315 316 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 317 assertEquals(2, slw.getTaskReadySeq()); 318 byte[] bytes = ZKUtil.getData(zkw, PATH2); 319 slt = SplitLogTask.parseFrom(bytes); 320 assertTrue(slt.isOwned(SRV)); 321 } finally { 322 stopSplitLogWorker(slw); 323 } 324 } 325 326 @Test 327 public void testRescan() throws Exception { 328 LOG.info("testRescan"); 329 SplitLogCounters.resetCounters(); 330 final ServerName SRV = ServerName.valueOf("svr,1,1"); 331 RegionServerServices mockedRS = getRegionServer(SRV); 332 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 333 slw.start(); 334 Thread.yield(); // let the worker start 335 Thread.sleep(100); 336 337 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 338 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 339 zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 340 CreateMode.PERSISTENT); 341 342 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 343 // now the worker is busy doing the above task 344 345 // preempt the task, have it owned by another worker 346 ZKUtil.setData(zkw, task, slt.toByteArray()); 347 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 348 349 // create a RESCAN node 350 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 351 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 352 CreateMode.PERSISTENT_SEQUENTIAL); 353 354 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 355 // RESCAN node might not have been processed if the worker became busy 356 // with the above task. preempt the task again so that now the RESCAN 357 // node is processed 358 ZKUtil.setData(zkw, task, slt.toByteArray()); 359 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 360 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 361 362 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 363 LOG.debug(Objects.toString(nodes)); 364 int num = 0; 365 for (String node : nodes) { 366 num++; 367 if (node.startsWith("RESCAN")) { 368 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 369 String fn = ZKSplitLog.getFileName(name); 370 byte[] data = 371 ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 372 slt = SplitLogTask.parseFrom(data); 373 assertTrue(slt.toString(), slt.isDone(SRV)); 374 } 375 } 376 assertEquals(2, num); 377 } 378 379 @Test 380 public void testAcquireMultiTasks() throws Exception { 381 LOG.info("testAcquireMultiTasks"); 382 SplitLogCounters.resetCounters(); 383 final String TATAS = "tatas"; 384 final ServerName RS = ServerName.valueOf("rs,1,1"); 385 final int maxTasks = 3; 386 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 387 testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); 388 RegionServerServices mockedRS = getRegionServer(RS); 389 for (int i = 0; i < maxTasks; i++) { 390 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 391 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 392 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 393 } 394 395 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 396 slw.start(); 397 try { 398 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 399 for (int i = 0; i < maxTasks; i++) { 400 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 401 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 402 assertTrue(slt.isOwned(RS)); 403 } 404 } finally { 405 stopSplitLogWorker(slw); 406 } 407 } 408 409 /** 410 * Create a mocked region server service instance 411 */ 412 private RegionServerServices getRegionServer(ServerName name) { 413 414 RegionServerServices mockedServer = mock(RegionServerServices.class); 415 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 416 when(mockedServer.getServerName()).thenReturn(name); 417 when(mockedServer.getZooKeeper()).thenReturn(zkw); 418 when(mockedServer.isStopped()).thenReturn(false); 419 when(mockedServer.getExecutorService()).thenReturn(executorService); 420 421 return mockedServer; 422 } 423 424}