001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.net.InetAddress; 023import java.security.PrivilegedAction; 024import java.security.PrivilegedExceptionAction; 025import java.security.cert.X509Certificate; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ExtendedCellScanner; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ipc.RpcCall; 039import org.apache.hadoop.hbase.ipc.RpcCallback; 040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 057import org.apache.hbase.thirdparty.com.google.protobuf.Message; 058 059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 064 065/** 066 * Tests for Online SlowLog Provider Service 067 */ 068@Category({ MasterTests.class, MediumTests.class }) 069public class TestNamedQueueRecorder { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 076 077 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 078 private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS = 079 ImmutableList.<HBaseProtos.NameBytesPair> builder() 080 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 081 .setValue(ByteString.copyFromUtf8("r")).build()) 082 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 083 .setValue(ByteString.copyFromUtf8("h")).build()) 084 .build(); 085 private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS = 086 ImmutableList.<HBaseProtos.NameBytesPair> builder() 087 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 088 .setValue(ByteString.copyFromUtf8("c")).build()) 089 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 090 .setValue(ByteString.copyFromUtf8("h")).build()) 091 .build(); 092 093 private NamedQueueRecorder namedQueueRecorder; 094 095 private static int i = 0; 096 097 private static Configuration applySlowLogRecorderConf(int eventSize) { 098 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 099 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 100 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 101 return conf; 102 } 103 104 /** 105 * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected 106 * elements 107 * @param i index of ringbuffer logs 108 * @param j data value that was put on index i 109 * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} 110 * @return if actual values are as per expectations 111 */ 112 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 113 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 114 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 115 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 116 return isClassExpected && isClientExpected && isUserExpected; 117 } 118 119 @Test 120 public void testOnlieSlowLogConsumption() throws Exception { 121 122 Configuration conf = applySlowLogRecorderConf(8); 123 Constructor<NamedQueueRecorder> constructor = 124 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 125 constructor.setAccessible(true); 126 namedQueueRecorder = constructor.newInstance(conf); 127 AdminProtos.SlowLogResponseRequest request = 128 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 129 130 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 131 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 132 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 133 134 int i = 0; 135 136 // add 5 records initially 137 for (; i < 5; i++) { 138 RpcLogDetails rpcLogDetails = 139 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 140 namedQueueRecorder.addRecord(rpcLogDetails); 141 } 142 143 Assert.assertNotEquals(-1, 144 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); 145 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 146 Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 147 Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 148 Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 149 Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 150 Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 151 152 // add 2 more records 153 for (; i < 7; i++) { 154 RpcLogDetails rpcLogDetails = 155 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 156 namedQueueRecorder.addRecord(rpcLogDetails); 157 } 158 159 Assert.assertNotEquals(-1, 160 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7)); 161 162 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 163 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 164 return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) 165 && confirmPayloadParams(5, 2, slowLogPayloadsList) 166 && confirmPayloadParams(6, 1, slowLogPayloadsList); 167 })); 168 169 // add 3 more records 170 for (; i < 10; i++) { 171 RpcLogDetails rpcLogDetails = 172 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 173 namedQueueRecorder.addRecord(rpcLogDetails); 174 } 175 176 Assert.assertNotEquals(-1, 177 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 178 179 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 180 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 181 // confirm ringbuffer is full 182 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) 183 && confirmPayloadParams(0, 10, slowLogPayloadsList) 184 && confirmPayloadParams(1, 9, slowLogPayloadsList); 185 })); 186 187 // add 4 more records 188 for (; i < 14; i++) { 189 RpcLogDetails rpcLogDetails = 190 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 191 namedQueueRecorder.addRecord(rpcLogDetails); 192 } 193 194 Assert.assertNotEquals(-1, 195 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 196 197 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 198 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 199 // confirm ringbuffer is full 200 // and ordered events 201 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 202 && confirmPayloadParams(1, 13, slowLogPayloadsList) 203 && confirmPayloadParams(2, 12, slowLogPayloadsList) 204 && confirmPayloadParams(3, 11, slowLogPayloadsList); 205 })); 206 207 AdminProtos.SlowLogResponseRequest largeLogRequest = 208 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15) 209 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 210 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 211 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); 212 // confirm ringbuffer is full 213 // and ordered events 214 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 215 && confirmPayloadParams(1, 13, slowLogPayloadsList) 216 && confirmPayloadParams(2, 12, slowLogPayloadsList) 217 && confirmPayloadParams(3, 11, slowLogPayloadsList); 218 })); 219 220 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 221 boolean isRingBufferCleaned = 222 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 223 224 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 225 226 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 227 // confirm ringbuffer is empty 228 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 229 })); 230 231 } 232 233 private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 234 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 235 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 236 namedQueueGetRequest.setSlowLogResponseRequest(request); 237 NamedQueueGetResponse namedQueueGetResponse = 238 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 239 return namedQueueGetResponse == null 240 ? Collections.emptyList() 241 : namedQueueGetResponse.getSlowLogPayloads(); 242 } 243 244 @Test 245 public void testOnlineSlowLogWithHighRecords() throws Exception { 246 247 Configuration conf = applySlowLogRecorderConf(14); 248 Constructor<NamedQueueRecorder> constructor = 249 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 250 constructor.setAccessible(true); 251 namedQueueRecorder = constructor.newInstance(conf); 252 AdminProtos.SlowLogResponseRequest request = 253 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 254 255 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 256 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 257 258 for (int i = 0; i < 14 * 11; i++) { 259 RpcLogDetails rpcLogDetails = 260 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 261 namedQueueRecorder.addRecord(rpcLogDetails); 262 } 263 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 264 265 Assert.assertNotEquals(-1, 266 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 267 268 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 269 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 270 271 // confirm strict order of slow log payloads 272 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads) 273 && confirmPayloadParams(1, 153, slowLogPayloads) 274 && confirmPayloadParams(2, 152, slowLogPayloads) 275 && confirmPayloadParams(3, 151, slowLogPayloads) 276 && confirmPayloadParams(4, 150, slowLogPayloads) 277 && confirmPayloadParams(5, 149, slowLogPayloads) 278 && confirmPayloadParams(6, 148, slowLogPayloads) 279 && confirmPayloadParams(7, 147, slowLogPayloads) 280 && confirmPayloadParams(8, 146, slowLogPayloads) 281 && confirmPayloadParams(9, 145, slowLogPayloads) 282 && confirmPayloadParams(10, 144, slowLogPayloads) 283 && confirmPayloadParams(11, 143, slowLogPayloads) 284 && confirmPayloadParams(12, 142, slowLogPayloads) 285 && confirmPayloadParams(13, 141, slowLogPayloads); 286 })); 287 288 boolean isRingBufferCleaned = 289 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 290 Assert.assertTrue(isRingBufferCleaned); 291 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 292 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 293 294 // confirm ringbuffer is empty 295 Assert.assertEquals(slowLogPayloads.size(), 0); 296 } 297 298 @Test 299 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 300 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 301 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 302 303 Constructor<NamedQueueRecorder> constructor = 304 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 305 constructor.setAccessible(true); 306 namedQueueRecorder = constructor.newInstance(conf); 307 AdminProtos.SlowLogResponseRequest request = 308 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 309 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 310 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 311 for (int i = 0; i < 300; i++) { 312 RpcLogDetails rpcLogDetails = 313 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 314 namedQueueRecorder.addRecord(rpcLogDetails); 315 } 316 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 317 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 318 return slowLogPayloads.size() == 0; 319 })); 320 321 } 322 323 @Test 324 public void testOnlineSlowLogWithDisableConfig() throws Exception { 325 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 326 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 327 Constructor<NamedQueueRecorder> constructor = 328 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 329 constructor.setAccessible(true); 330 namedQueueRecorder = constructor.newInstance(conf); 331 332 AdminProtos.SlowLogResponseRequest request = 333 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 334 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 335 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 336 for (int i = 0; i < 300; i++) { 337 RpcLogDetails rpcLogDetails = 338 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 339 namedQueueRecorder.addRecord(rpcLogDetails); 340 } 341 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 342 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 343 return slowLogPayloads.size() == 0; 344 })); 345 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 346 } 347 348 @Test 349 public void testSlowLogFilters() throws Exception { 350 351 Configuration conf = applySlowLogRecorderConf(30); 352 Constructor<NamedQueueRecorder> constructor = 353 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 354 constructor.setAccessible(true); 355 namedQueueRecorder = constructor.newInstance(conf); 356 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 357 .setLimit(15).setUserName("userName_87").build(); 358 359 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 360 361 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 362 363 for (int i = 0; i < 100; i++) { 364 RpcLogDetails rpcLogDetails = 365 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 366 namedQueueRecorder.addRecord(rpcLogDetails); 367 } 368 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 369 370 Assert.assertNotEquals(-1, 371 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1)); 372 373 AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest 374 .newBuilder().setLimit(15).setClientAddress("client_85").build(); 375 Assert.assertNotEquals(-1, 376 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1)); 377 378 AdminProtos.SlowLogResponseRequest requestSlowLog = 379 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 380 Assert.assertNotEquals(-1, 381 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 382 } 383 384 @Test 385 public void testSlowLogFilterWithClientAddress() throws Exception { 386 Configuration conf = applySlowLogRecorderConf(10); 387 Constructor<NamedQueueRecorder> constructor = 388 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 389 constructor.setAccessible(true); 390 namedQueueRecorder = constructor.newInstance(conf); 391 AdminProtos.SlowLogResponseRequest request = 392 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 393 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 394 395 String[] clientAddressArray = new String[] { "[127:1:1:1:1:1:1:1]:1", "[127:1:1:1:1:1:1:1]:2", 396 "[127:1:1:1:1:1:1:1]:3", "127.0.0.1:1", "127.0.0.1:2" }; 397 boolean isSlowLog; 398 boolean isLargeLog; 399 for (int i = 0; i < 10; i++) { 400 if (i % 2 == 0) { 401 isSlowLog = true; 402 isLargeLog = false; 403 } else { 404 isSlowLog = false; 405 isLargeLog = true; 406 } 407 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), 408 clientAddressArray[i % 5], "class_" + (i + 1), isSlowLog, isLargeLog); 409 namedQueueRecorder.addRecord(rpcLogDetails); 410 } 411 412 AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithPort = 413 AdminProtos.SlowLogResponseRequest.newBuilder() 414 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 415 .setClientAddress("[127:1:1:1:1:1:1:1]:2").build(); 416 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 417 () -> getSlowLogPayloads(largeLogRequestIPv6WithPort).size() == 1)); 418 AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithoutPort = 419 AdminProtos.SlowLogResponseRequest.newBuilder() 420 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 421 .setClientAddress("[127:1:1:1:1:1:1:1]").build(); 422 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 423 () -> getSlowLogPayloads(largeLogRequestIPv6WithoutPort).size() == 3)); 424 AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithPort = 425 AdminProtos.SlowLogResponseRequest.newBuilder() 426 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 427 .setClientAddress("127.0.0.1:1").build(); 428 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 429 () -> getSlowLogPayloads(largeLogRequestIPv4WithPort).size() == 1)); 430 AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithoutPort = 431 AdminProtos.SlowLogResponseRequest.newBuilder() 432 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 433 .setClientAddress("127.0.0.1").build(); 434 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 435 () -> getSlowLogPayloads(largeLogRequestIPv4WithoutPort).size() == 2)); 436 } 437 438 @Test 439 public void testConcurrentSlowLogEvents() throws Exception { 440 441 Configuration conf = applySlowLogRecorderConf(50000); 442 Constructor<NamedQueueRecorder> constructor = 443 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 444 constructor.setAccessible(true); 445 namedQueueRecorder = constructor.newInstance(conf); 446 AdminProtos.SlowLogResponseRequest request = 447 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 448 AdminProtos.SlowLogResponseRequest largeLogRequest = 449 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000) 450 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 451 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 452 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 453 454 for (int j = 0; j < 1000; j++) { 455 456 CompletableFuture.runAsync(() -> { 457 for (int i = 0; i < 3500; i++) { 458 RpcLogDetails rpcLogDetails = 459 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 460 namedQueueRecorder.addRecord(rpcLogDetails); 461 } 462 }); 463 464 } 465 466 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 467 468 Assert.assertNotEquals(-1, 469 HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000)); 470 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000, 471 () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); 472 } 473 474 @Test 475 public void testSlowLargeLogEvents() throws Exception { 476 Configuration conf = applySlowLogRecorderConf(28); 477 Constructor<NamedQueueRecorder> constructor = 478 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 479 constructor.setAccessible(true); 480 namedQueueRecorder = constructor.newInstance(conf); 481 482 AdminProtos.SlowLogResponseRequest request = 483 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 484 485 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 486 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 487 488 boolean isSlowLog; 489 boolean isLargeLog; 490 for (int i = 0; i < 14 * 11; i++) { 491 if (i % 2 == 0) { 492 isSlowLog = true; 493 isLargeLog = false; 494 } else { 495 isSlowLog = false; 496 isLargeLog = true; 497 } 498 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 499 "class_" + (i + 1), isSlowLog, isLargeLog); 500 namedQueueRecorder.addRecord(rpcLogDetails); 501 } 502 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 503 504 Assert.assertNotEquals(-1, 505 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 506 507 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 508 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 509 510 // confirm strict order of slow log payloads 511 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads) 512 && confirmPayloadParams(1, 151, slowLogPayloads) 513 && confirmPayloadParams(2, 149, slowLogPayloads) 514 && confirmPayloadParams(3, 147, slowLogPayloads) 515 && confirmPayloadParams(4, 145, slowLogPayloads) 516 && confirmPayloadParams(5, 143, slowLogPayloads) 517 && confirmPayloadParams(6, 141, slowLogPayloads) 518 && confirmPayloadParams(7, 139, slowLogPayloads) 519 && confirmPayloadParams(8, 137, slowLogPayloads) 520 && confirmPayloadParams(9, 135, slowLogPayloads) 521 && confirmPayloadParams(10, 133, slowLogPayloads) 522 && confirmPayloadParams(11, 131, slowLogPayloads) 523 && confirmPayloadParams(12, 129, slowLogPayloads) 524 && confirmPayloadParams(13, 127, slowLogPayloads); 525 })); 526 527 AdminProtos.SlowLogResponseRequest largeLogRequest = 528 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11) 529 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 530 531 Assert.assertNotEquals(-1, 532 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14)); 533 534 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 535 List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); 536 537 // confirm strict order of slow log payloads 538 return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads) 539 && confirmPayloadParams(1, 152, largeLogPayloads) 540 && confirmPayloadParams(2, 150, largeLogPayloads) 541 && confirmPayloadParams(3, 148, largeLogPayloads) 542 && confirmPayloadParams(4, 146, largeLogPayloads) 543 && confirmPayloadParams(5, 144, largeLogPayloads) 544 && confirmPayloadParams(6, 142, largeLogPayloads) 545 && confirmPayloadParams(7, 140, largeLogPayloads) 546 && confirmPayloadParams(8, 138, largeLogPayloads) 547 && confirmPayloadParams(9, 136, largeLogPayloads) 548 && confirmPayloadParams(10, 134, largeLogPayloads) 549 && confirmPayloadParams(11, 132, largeLogPayloads) 550 && confirmPayloadParams(12, 130, largeLogPayloads) 551 && confirmPayloadParams(13, 128, largeLogPayloads); 552 })); 553 } 554 555 @Test 556 public void testSlowLogMixedFilters() throws Exception { 557 558 Configuration conf = applySlowLogRecorderConf(30); 559 Constructor<NamedQueueRecorder> constructor = 560 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 561 constructor.setAccessible(true); 562 namedQueueRecorder = constructor.newInstance(conf); 563 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 564 .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build(); 565 566 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 567 568 for (int i = 0; i < 100; i++) { 569 RpcLogDetails rpcLogDetails = 570 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 571 namedQueueRecorder.addRecord(rpcLogDetails); 572 } 573 574 Assert.assertNotEquals(-1, 575 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2)); 576 577 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 578 .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build(); 579 Assert.assertEquals(0, getSlowLogPayloads(request2).size()); 580 581 AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() 582 .setLimit(15).setUserName("userName_87").setClientAddress("client_88") 583 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 584 Assert.assertEquals(0, getSlowLogPayloads(request3).size()); 585 586 AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() 587 .setLimit(15).setUserName("userName_87").setClientAddress("client_87") 588 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 589 Assert.assertEquals(1, getSlowLogPayloads(request4).size()); 590 591 AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() 592 .setLimit(15).setUserName("userName_88").setClientAddress("client_89") 593 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build(); 594 Assert.assertEquals(2, getSlowLogPayloads(request5).size()); 595 596 AdminProtos.SlowLogResponseRequest requestSlowLog = 597 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 598 Assert.assertNotEquals(-1, 599 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 600 } 601 602 @Test 603 public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception { 604 Configuration conf = applySlowLogRecorderConf(1); 605 conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED); 606 Constructor<NamedQueueRecorder> constructor = 607 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 608 constructor.setAccessible(true); 609 namedQueueRecorder = constructor.newInstance(conf); 610 AdminProtos.SlowLogResponseRequest request = 611 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 612 613 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 614 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 615 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 616 namedQueueRecorder.addRecord(rpcLogDetails); 617 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 618 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 619 if (slowLogPayload.isPresent()) { 620 return !slowLogPayload.get().hasScan(); 621 } 622 return false; 623 })); 624 } 625 626 @Test 627 public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception { 628 Configuration conf = applySlowLogRecorderConf(1); 629 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false); 630 Constructor<NamedQueueRecorder> constructor = 631 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 632 constructor.setAccessible(true); 633 namedQueueRecorder = constructor.newInstance(conf); 634 AdminProtos.SlowLogResponseRequest request = 635 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 636 637 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 638 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 639 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 640 namedQueueRecorder.addRecord(rpcLogDetails); 641 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 642 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 643 if (slowLogPayload.isPresent()) { 644 return !slowLogPayload.get().hasScan(); 645 } 646 return false; 647 })); 648 } 649 650 @Test 651 public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { 652 Configuration conf = applySlowLogRecorderConf(1); 653 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true); 654 Constructor<NamedQueueRecorder> constructor = 655 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 656 constructor.setAccessible(true); 657 namedQueueRecorder = constructor.newInstance(conf); 658 AdminProtos.SlowLogResponseRequest request = 659 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 660 661 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 662 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 663 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 664 namedQueueRecorder.addRecord(rpcLogDetails); 665 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 666 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 667 if (slowLogPayload.isPresent()) { 668 return slowLogPayload.get().hasScan(); 669 } 670 return false; 671 })); 672 } 673 674 @Test 675 public void testOnlineSlowLogRequestAttributes() throws Exception { 676 Configuration conf = applySlowLogRecorderConf(1); 677 Constructor<NamedQueueRecorder> constructor = 678 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 679 constructor.setAccessible(true); 680 namedQueueRecorder = constructor.newInstance(conf); 681 AdminProtos.SlowLogResponseRequest request = 682 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 683 684 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 685 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 686 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 687 namedQueueRecorder.addRecord(rpcLogDetails); 688 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 689 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 690 if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { 691 return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); 692 } 693 return false; 694 })); 695 } 696 697 @Test 698 public void testOnlineSlowLogConnectionAttributes() throws Exception { 699 Configuration conf = applySlowLogRecorderConf(1); 700 Constructor<NamedQueueRecorder> constructor = 701 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 702 constructor.setAccessible(true); 703 namedQueueRecorder = constructor.newInstance(conf); 704 AdminProtos.SlowLogResponseRequest request = 705 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 706 707 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 708 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 709 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 710 namedQueueRecorder.addRecord(rpcLogDetails); 711 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 712 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 713 if ( 714 slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty() 715 ) { 716 return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); 717 } 718 return false; 719 })); 720 } 721 722 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 723 int forcedParamIndex) { 724 RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); 725 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 726 true); 727 } 728 729 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 730 RpcCall rpcCall = getRpcCall(userName); 731 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 732 true); 733 } 734 735 private static RpcLogDetails getRpcLogDetailsOfScan() { 736 // forcedParamIndex of 0 results in a ScanRequest 737 return getRpcLogDetails("userName_1", "client_1", "class_1", 0); 738 } 739 740 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 741 boolean isSlowLog, boolean isLargeLog) { 742 RpcCall rpcCall = getRpcCall(userName); 743 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, 744 isSlowLog, isLargeLog); 745 } 746 747 private static RpcCall getRpcCall(String userName) { 748 return getRpcCall(userName, Optional.empty()); 749 } 750 751 private static RpcCall getRpcCall(String userName, int forcedParamIndex) { 752 return getRpcCall(userName, Optional.of(forcedParamIndex)); 753 } 754 755 @SuppressWarnings("checkstyle:methodlength") 756 private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) { 757 RpcCall rpcCall = new RpcCall() { 758 @Override 759 public BlockingService getService() { 760 return null; 761 } 762 763 @Override 764 public Descriptors.MethodDescriptor getMethod() { 765 return null; 766 } 767 768 @Override 769 public Message getParam() { 770 return getMessage(forcedParamIndex); 771 } 772 773 @Override 774 public ExtendedCellScanner getCellScanner() { 775 return null; 776 } 777 778 @Override 779 public long getReceiveTime() { 780 return 0; 781 } 782 783 @Override 784 public long getStartTime() { 785 return 0; 786 } 787 788 @Override 789 public void setStartTime(long startTime) { 790 } 791 792 @Override 793 public int getTimeout() { 794 return 0; 795 } 796 797 @Override 798 public int getPriority() { 799 return 0; 800 } 801 802 @Override 803 public long getDeadline() { 804 return 0; 805 } 806 807 @Override 808 public long getSize() { 809 return 0; 810 } 811 812 @Override 813 public RPCProtos.RequestHeader getHeader() { 814 return null; 815 } 816 817 @Override 818 public Map<String, byte[]> getConnectionAttributes() { 819 return CONNECTION_HEADERS.stream().collect(Collectors 820 .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray())); 821 } 822 823 @Override 824 public Map<String, byte[]> getRequestAttributes() { 825 return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, 826 pair -> pair.getValue().toByteArray())); 827 } 828 829 @Override 830 public byte[] getRequestAttribute(String key) { 831 return null; 832 } 833 834 @Override 835 public int getRemotePort() { 836 return 0; 837 } 838 839 @Override 840 public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable, 841 String error) { 842 } 843 844 @Override 845 public void sendResponseIfReady() throws IOException { 846 } 847 848 @Override 849 public void cleanup() { 850 } 851 852 @Override 853 public String toShortString() { 854 return null; 855 } 856 857 @Override 858 public long disconnectSince() { 859 return 0; 860 } 861 862 @Override 863 public boolean isClientCellBlockSupported() { 864 return false; 865 } 866 867 @Override 868 public Optional<User> getRequestUser() { 869 return getUser(userName); 870 } 871 872 @Override 873 public Optional<X509Certificate[]> getClientCertificateChain() { 874 return Optional.empty(); 875 } 876 877 @Override 878 public InetAddress getRemoteAddress() { 879 return null; 880 } 881 882 @Override 883 public HBaseProtos.VersionInfo getClientVersionInfo() { 884 return null; 885 } 886 887 @Override 888 public void setCallBack(RpcCallback callback) { 889 } 890 891 @Override 892 public boolean isRetryImmediatelySupported() { 893 return false; 894 } 895 896 @Override 897 public long getResponseCellSize() { 898 return 0; 899 } 900 901 @Override 902 public void incrementResponseCellSize(long cellSize) { 903 } 904 905 @Override 906 public long getBlockBytesScanned() { 907 return 0; 908 } 909 910 @Override 911 public void incrementBlockBytesScanned(long blockSize) { 912 } 913 914 @Override 915 public long getResponseExceptionSize() { 916 return 0; 917 } 918 919 @Override 920 public void incrementResponseExceptionSize(long exceptionSize) { 921 } 922 923 @Override 924 public void updateFsReadTime(long latencyMillis) { 925 926 } 927 928 @Override 929 public long getFsReadTime() { 930 return 0; 931 } 932 }; 933 return rpcCall; 934 } 935 936 private static Message getMessage(Optional<Integer> forcedParamIndex) { 937 938 i = (i + 1) % 3; 939 940 Message message = null; 941 942 switch (forcedParamIndex.orElse(i)) { 943 944 case 0: { 945 message = ClientProtos.ScanRequest.newBuilder() 946 .setRegion( 947 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1")) 948 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build()) 949 .build(); 950 break; 951 } 952 case 1: { 953 message = ClientProtos.MutateRequest.newBuilder() 954 .setRegion( 955 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 956 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 957 .setMutation(ClientProtos.MutationProto.newBuilder() 958 .setRow(ByteString.copyFromUtf8("row123")).build()) 959 .build(); 960 break; 961 } 962 case 2: { 963 message = ClientProtos.GetRequest.newBuilder() 964 .setRegion( 965 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 966 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 967 .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build()) 968 .build(); 969 break; 970 } 971 default: 972 throw new RuntimeException("Not supposed to get here?"); 973 } 974 975 return message; 976 977 } 978 979 private static Optional<User> getUser(String userName) { 980 981 return Optional.of(new User() { 982 @Override 983 public String getShortName() { 984 return userName; 985 } 986 987 @Override 988 public <T> T runAs(PrivilegedAction<T> action) { 989 return null; 990 } 991 992 @Override 993 public <T> T runAs(PrivilegedExceptionAction<T> action) { 994 return null; 995 } 996 }); 997 998 } 999 1000}