001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.monitoring; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.client.Mutation; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Query; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.testclassification.MiscTests; 038import org.apache.hadoop.hbase.testclassification.SmallTests; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.skyscreamer.jsonassert.JSONAssert; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Category({ MiscTests.class, SmallTests.class }) 048public class TestTaskMonitor { 049 private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class); 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestTaskMonitor.class); 054 055 @Test 056 public void testTaskMonitorBasics() { 057 TaskMonitor tm = new TaskMonitor(new Configuration()); 058 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 059 060 // Make a task and fetch it back out 061 MonitoredTask task = tm.createStatus("Test task"); 062 MonitoredTask taskFromTm = tm.getTasks().get(0); 063 064 // Make sure the state is reasonable. 065 assertEquals(task.getDescription(), taskFromTm.getDescription()); 066 assertEquals(-1, taskFromTm.getCompletionTimestamp()); 067 assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); 068 assertEquals(task.getStatus(), taskFromTm.getStatus()); 069 assertEquals("status unset", taskFromTm.getStatus()); 070 071 // Mark it as finished 072 task.markComplete("Finished!"); 073 assertEquals(MonitoredTask.State.COMPLETE, task.getState()); 074 075 // It should still show up in the TaskMonitor list 076 assertEquals(1, tm.getTasks().size()); 077 078 // If we mark its completion time back a few minutes, it should get gced 079 task.expireNow(); 080 assertEquals(0, tm.getTasks().size()); 081 082 tm.shutdown(); 083 } 084 085 @Test 086 public void testTasksGetAbortedOnLeak() throws InterruptedException { 087 final TaskMonitor tm = new TaskMonitor(new Configuration()); 088 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 089 090 final AtomicBoolean threadSuccess = new AtomicBoolean(false); 091 // Make a task in some other thread and leak it 092 Thread t = new Thread() { 093 @Override 094 public void run() { 095 MonitoredTask task = tm.createStatus("Test task"); 096 assertEquals(MonitoredTask.State.RUNNING, task.getState()); 097 threadSuccess.set(true); 098 } 099 }; 100 t.start(); 101 t.join(); 102 // Make sure the thread saw the correct state 103 assertTrue(threadSuccess.get()); 104 105 // Make sure the leaked reference gets cleared 106 System.gc(); 107 System.gc(); 108 System.gc(); 109 110 // Now it should be aborted 111 MonitoredTask taskFromTm = tm.getTasks().get(0); 112 assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); 113 114 tm.shutdown(); 115 } 116 117 @Test 118 public void testTaskLimit() throws Exception { 119 TaskMonitor tm = new TaskMonitor(new Configuration()); 120 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { 121 tm.createStatus("task " + i); 122 } 123 // Make sure it was limited correctly 124 assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); 125 // Make sure we culled the earlier tasks, not later 126 // (i.e. tasks 0 through 9 should have been deleted) 127 assertEquals("task 10", tm.getTasks().get(0).getDescription()); 128 tm.shutdown(); 129 } 130 131 @Test 132 public void testDoNotPurgeRPCTask() throws Exception { 133 int RPCTaskNums = 10; 134 TaskMonitor tm = TaskMonitor.get(); 135 for (int i = 0; i < RPCTaskNums; i++) { 136 tm.createRPCStatus("PRCTask" + i); 137 } 138 for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { 139 tm.createStatus("otherTask" + i); 140 } 141 int remainRPCTask = 0; 142 for (MonitoredTask task : tm.getTasks()) { 143 if (task instanceof MonitoredRPCHandler) { 144 remainRPCTask++; 145 } 146 } 147 assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); 148 tm.shutdown(); 149 } 150 151 @Test 152 public void testWarnStuckTasks() throws Exception { 153 final int RPC_WARN_TIME = 1500; 154 final int MONITOR_INTERVAL = 500; 155 Configuration conf = new Configuration(); 156 conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, RPC_WARN_TIME); 157 conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, MONITOR_INTERVAL); 158 final TaskMonitor tm = new TaskMonitor(conf); 159 MonitoredRPCHandler t = tm.createRPCStatus("test task"); 160 long beforeSetRPC = EnvironmentEdgeManager.currentTime(); 161 assertTrue("Validating initialization assumption", t.getWarnTime() <= beforeSetRPC); 162 Thread.sleep(MONITOR_INTERVAL * 2); 163 t.setRPC("testMethod", new Object[0], beforeSetRPC); 164 long afterSetRPC = EnvironmentEdgeManager.currentTime(); 165 Thread.sleep(MONITOR_INTERVAL * 2); 166 assertTrue("Validating no warn after starting RPC", t.getWarnTime() <= afterSetRPC); 167 Thread.sleep(MONITOR_INTERVAL * 2); 168 assertTrue("Validating warn after RPC_WARN_TIME", t.getWarnTime() > afterSetRPC); 169 tm.shutdown(); 170 } 171 172 @Test 173 public void testGetTasksWithFilter() throws Exception { 174 TaskMonitor tm = new TaskMonitor(new Configuration()); 175 assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); 176 // Create 5 general tasks 177 tm.createStatus("General task1"); 178 tm.createStatus("General task2"); 179 tm.createStatus("General task3"); 180 tm.createStatus("General task4"); 181 tm.createStatus("General task5"); 182 // Create 5 rpc tasks, and mark 1 completed 183 int length = 5; 184 ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); 185 for (int i = 0; i < length; i++) { 186 MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); 187 rpcHandlers.add(rpcHandler); 188 } 189 // Create rpc opertions 190 byte[] row = new byte[] { 0x01 }; 191 Mutation m = new Put(row); 192 Query q = new Scan(); 193 String notOperation = "for test"; 194 rpcHandlers.get(0).setRPC("operations", new Object[] { m, q }, 3000); 195 rpcHandlers.get(1).setRPC("operations", new Object[] { m, q }, 3000); 196 rpcHandlers.get(2).setRPC("operations", new Object[] { m, q }, 3000); 197 rpcHandlers.get(3).setRPC("operations", new Object[] { notOperation }, 3000); 198 rpcHandlers.get(4).setRPC("operations", new Object[] { m, q }, 3000); 199 MonitoredRPCHandler completed = rpcHandlers.get(4); 200 completed.markComplete("Completed!"); 201 // Test get tasks with filter 202 List<MonitoredTask> generalTasks = tm.getTasks("general"); 203 assertEquals(5, generalTasks.size()); 204 List<MonitoredTask> handlerTasks = tm.getTasks("handler"); 205 assertEquals(5, handlerTasks.size()); 206 List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); 207 // The last rpc handler is stopped 208 assertEquals(4, rpcTasks.size()); 209 List<MonitoredTask> operationTasks = tm.getTasks("operation"); 210 // Handler 3 doesn't handle Operation. 211 assertEquals(3, operationTasks.size()); 212 tm.shutdown(); 213 } 214 215 @Test 216 public void testStatusJournal() { 217 TaskMonitor tm = new TaskMonitor(new Configuration()); 218 MonitoredTask task = tm.createStatus("Test task"); 219 assertTrue(task.getStatusJournal().isEmpty()); 220 task.setStatus("status1"); 221 // journal should be empty since it is disabled 222 assertTrue(task.getStatusJournal().isEmpty()); 223 task = tm.createStatus("Test task with journal", false, true); 224 task.setStatus("status2"); 225 assertEquals(1, task.getStatusJournal().size()); 226 assertEquals("status2", task.getStatusJournal().get(0).getStatus()); 227 task.setStatus("status3"); 228 assertEquals(2, task.getStatusJournal().size()); 229 assertEquals("status3", task.getStatusJournal().get(1).getStatus()); 230 task.prettyPrintJournal(); 231 tm.shutdown(); 232 } 233 234 @Test 235 public void testTaskGroup() { 236 TaskGroup group = TaskMonitor.createTaskGroup(true, "test task group"); 237 group.addTask("task1"); 238 MonitoredTask task2 = group.addTask("task2"); 239 task2.setStatus("task2 status2"); 240 task2.setStatus("task2 status3"); 241 group.addTask("task3"); 242 group.markComplete("group complete"); 243 Collection<MonitoredTask> tasks = group.getTasks(); 244 assertNotNull(tasks); 245 assertEquals(tasks.size(), 3); 246 for (MonitoredTask task : tasks) { 247 if (task.getDescription().equals("task2")) { 248 assertEquals(task.getStatusJournal().size(), 3); 249 task.prettyPrintJournal(); 250 } 251 } 252 } 253 254 @Test 255 public void testClone() throws Exception { 256 MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test"); 257 monitor.abort("abort RPC"); 258 TestParam testParam = new TestParam("param1"); 259 monitor.setRPC("method1", new Object[] { testParam }, 0); 260 MonitoredRPCHandlerImpl clone = monitor.clone(); 261 assertEquals(clone.getDescription(), monitor.getDescription()); 262 assertEquals(clone.getState(), monitor.getState()); 263 assertEquals(clone.getStatus(), monitor.getStatus()); 264 assertEquals(clone.toString(), monitor.toString()); 265 assertEquals(clone.toMap(), monitor.toMap()); 266 JSONAssert.assertEquals(clone.toJSON(), monitor.toJSON(), true); 267 268 // mark complete and make param dirty 269 monitor.markComplete("complete RPC"); 270 testParam.setParam("dirtyParam"); 271 assertEquals(clone.getDescription(), monitor.getDescription()); 272 assertNotEquals(clone.getState(), monitor.getState()); 273 assertNotEquals(clone.getStatus(), monitor.getStatus()); 274 monitor.setState(MonitoredTask.State.RUNNING); 275 try { 276 // when markComplete, the param in monitor is set null, so toMap should fail here 277 monitor.toMap(); 278 fail("Should not call toMap successfully, because param=null"); 279 } catch (Exception e) { 280 } 281 // the param of clone monitor should not be dirty 282 assertNotEquals("[dirtyString]", 283 String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"))); 284 285 monitor.resume("resume"); 286 monitor.setRPC("method2", new Object[] { new TestParam("param2") }, 1); 287 assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"), 288 ((Map<String, Object>) monitor.toMap().get("rpcCall")).get("params")); 289 LOG.info(String.valueOf(clone.toMap())); 290 LOG.info(String.valueOf(monitor.toMap())); 291 assertNotEquals(clone.toString(), monitor.toString()); 292 assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime()); 293 assertNotEquals(clone.toMap(), monitor.toMap()); 294 assertNotEquals(clone.toJSON(), monitor.toJSON()); 295 } 296 297 private class TestParam { 298 public String param = null; 299 300 public TestParam(String param) { 301 this.param = param; 302 } 303 304 public void setParam(String param) { 305 this.param = param; 306 } 307 308 @Override 309 public String toString() { 310 return param; 311 } 312 } 313}