001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.net.InetAddress; 023import java.security.PrivilegedAction; 024import java.security.PrivilegedExceptionAction; 025import java.security.cert.X509Certificate; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ipc.RpcCall; 039import org.apache.hadoop.hbase.ipc.RpcCallback; 040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 057import org.apache.hbase.thirdparty.com.google.protobuf.Message; 058 059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 064 065/** 066 * Tests for Online SlowLog Provider Service 067 */ 068@Category({ MasterTests.class, MediumTests.class }) 069public class TestNamedQueueRecorder { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 076 077 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 078 private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS = 079 ImmutableList.<HBaseProtos.NameBytesPair> builder() 080 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 081 .setValue(ByteString.copyFromUtf8("r")).build()) 082 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 083 .setValue(ByteString.copyFromUtf8("h")).build()) 084 .build(); 085 private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS = 086 ImmutableList.<HBaseProtos.NameBytesPair> builder() 087 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 088 .setValue(ByteString.copyFromUtf8("c")).build()) 089 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 090 .setValue(ByteString.copyFromUtf8("h")).build()) 091 .build(); 092 093 private NamedQueueRecorder namedQueueRecorder; 094 095 private static int i = 0; 096 097 private static Configuration applySlowLogRecorderConf(int eventSize) { 098 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 099 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 100 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 101 return conf; 102 } 103 104 /** 105 * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected 106 * elements 107 * @param i index of ringbuffer logs 108 * @param j data value that was put on index i 109 * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} 110 * @return if actual values are as per expectations 111 */ 112 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 113 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 114 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 115 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 116 return isClassExpected && isClientExpected && isUserExpected; 117 } 118 119 @Test 120 public void testOnlieSlowLogConsumption() throws Exception { 121 122 Configuration conf = applySlowLogRecorderConf(8); 123 Constructor<NamedQueueRecorder> constructor = 124 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 125 constructor.setAccessible(true); 126 namedQueueRecorder = constructor.newInstance(conf); 127 AdminProtos.SlowLogResponseRequest request = 128 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 129 130 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 131 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 132 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 133 134 int i = 0; 135 136 // add 5 records initially 137 for (; i < 5; i++) { 138 RpcLogDetails rpcLogDetails = 139 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 140 namedQueueRecorder.addRecord(rpcLogDetails); 141 } 142 143 Assert.assertNotEquals(-1, 144 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); 145 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 146 Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 147 Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 148 Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 149 Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 150 Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 151 152 // add 2 more records 153 for (; i < 7; i++) { 154 RpcLogDetails rpcLogDetails = 155 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 156 namedQueueRecorder.addRecord(rpcLogDetails); 157 } 158 159 Assert.assertNotEquals(-1, 160 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7)); 161 162 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 163 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 164 return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) 165 && confirmPayloadParams(5, 2, slowLogPayloadsList) 166 && confirmPayloadParams(6, 1, slowLogPayloadsList); 167 })); 168 169 // add 3 more records 170 for (; i < 10; i++) { 171 RpcLogDetails rpcLogDetails = 172 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 173 namedQueueRecorder.addRecord(rpcLogDetails); 174 } 175 176 Assert.assertNotEquals(-1, 177 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 178 179 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 180 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 181 // confirm ringbuffer is full 182 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) 183 && confirmPayloadParams(0, 10, slowLogPayloadsList) 184 && confirmPayloadParams(1, 9, slowLogPayloadsList); 185 })); 186 187 // add 4 more records 188 for (; i < 14; i++) { 189 RpcLogDetails rpcLogDetails = 190 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 191 namedQueueRecorder.addRecord(rpcLogDetails); 192 } 193 194 Assert.assertNotEquals(-1, 195 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 196 197 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 198 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 199 // confirm ringbuffer is full 200 // and ordered events 201 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 202 && confirmPayloadParams(1, 13, slowLogPayloadsList) 203 && confirmPayloadParams(2, 12, slowLogPayloadsList) 204 && confirmPayloadParams(3, 11, slowLogPayloadsList); 205 })); 206 207 AdminProtos.SlowLogResponseRequest largeLogRequest = 208 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15) 209 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 210 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 211 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); 212 // confirm ringbuffer is full 213 // and ordered events 214 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 215 && confirmPayloadParams(1, 13, slowLogPayloadsList) 216 && confirmPayloadParams(2, 12, slowLogPayloadsList) 217 && confirmPayloadParams(3, 11, slowLogPayloadsList); 218 })); 219 220 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 221 boolean isRingBufferCleaned = 222 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 223 224 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 225 226 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 227 // confirm ringbuffer is empty 228 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 229 })); 230 231 } 232 233 private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 234 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 235 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 236 namedQueueGetRequest.setSlowLogResponseRequest(request); 237 NamedQueueGetResponse namedQueueGetResponse = 238 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 239 return namedQueueGetResponse == null 240 ? Collections.emptyList() 241 : namedQueueGetResponse.getSlowLogPayloads(); 242 } 243 244 @Test 245 public void testOnlineSlowLogWithHighRecords() throws Exception { 246 247 Configuration conf = applySlowLogRecorderConf(14); 248 Constructor<NamedQueueRecorder> constructor = 249 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 250 constructor.setAccessible(true); 251 namedQueueRecorder = constructor.newInstance(conf); 252 AdminProtos.SlowLogResponseRequest request = 253 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 254 255 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 256 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 257 258 for (int i = 0; i < 14 * 11; i++) { 259 RpcLogDetails rpcLogDetails = 260 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 261 namedQueueRecorder.addRecord(rpcLogDetails); 262 } 263 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 264 265 Assert.assertNotEquals(-1, 266 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 267 268 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 269 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 270 271 // confirm strict order of slow log payloads 272 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads) 273 && confirmPayloadParams(1, 153, slowLogPayloads) 274 && confirmPayloadParams(2, 152, slowLogPayloads) 275 && confirmPayloadParams(3, 151, slowLogPayloads) 276 && confirmPayloadParams(4, 150, slowLogPayloads) 277 && confirmPayloadParams(5, 149, slowLogPayloads) 278 && confirmPayloadParams(6, 148, slowLogPayloads) 279 && confirmPayloadParams(7, 147, slowLogPayloads) 280 && confirmPayloadParams(8, 146, slowLogPayloads) 281 && confirmPayloadParams(9, 145, slowLogPayloads) 282 && confirmPayloadParams(10, 144, slowLogPayloads) 283 && confirmPayloadParams(11, 143, slowLogPayloads) 284 && confirmPayloadParams(12, 142, slowLogPayloads) 285 && confirmPayloadParams(13, 141, slowLogPayloads); 286 })); 287 288 boolean isRingBufferCleaned = 289 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 290 Assert.assertTrue(isRingBufferCleaned); 291 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 292 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 293 294 // confirm ringbuffer is empty 295 Assert.assertEquals(slowLogPayloads.size(), 0); 296 } 297 298 @Test 299 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 300 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 301 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 302 303 Constructor<NamedQueueRecorder> constructor = 304 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 305 constructor.setAccessible(true); 306 namedQueueRecorder = constructor.newInstance(conf); 307 AdminProtos.SlowLogResponseRequest request = 308 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 309 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 310 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 311 for (int i = 0; i < 300; i++) { 312 RpcLogDetails rpcLogDetails = 313 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 314 namedQueueRecorder.addRecord(rpcLogDetails); 315 } 316 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 317 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 318 return slowLogPayloads.size() == 0; 319 })); 320 321 } 322 323 @Test 324 public void testOnlineSlowLogWithDisableConfig() throws Exception { 325 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 326 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 327 Constructor<NamedQueueRecorder> constructor = 328 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 329 constructor.setAccessible(true); 330 namedQueueRecorder = constructor.newInstance(conf); 331 332 AdminProtos.SlowLogResponseRequest request = 333 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 334 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 335 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 336 for (int i = 0; i < 300; i++) { 337 RpcLogDetails rpcLogDetails = 338 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 339 namedQueueRecorder.addRecord(rpcLogDetails); 340 } 341 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 342 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 343 return slowLogPayloads.size() == 0; 344 })); 345 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 346 } 347 348 @Test 349 public void testSlowLogFilters() throws Exception { 350 351 Configuration conf = applySlowLogRecorderConf(30); 352 Constructor<NamedQueueRecorder> constructor = 353 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 354 constructor.setAccessible(true); 355 namedQueueRecorder = constructor.newInstance(conf); 356 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 357 .setLimit(15).setUserName("userName_87").build(); 358 359 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 360 361 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 362 363 for (int i = 0; i < 100; i++) { 364 RpcLogDetails rpcLogDetails = 365 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 366 namedQueueRecorder.addRecord(rpcLogDetails); 367 } 368 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 369 370 Assert.assertNotEquals(-1, 371 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1)); 372 373 AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest 374 .newBuilder().setLimit(15).setClientAddress("client_85").build(); 375 Assert.assertNotEquals(-1, 376 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1)); 377 378 AdminProtos.SlowLogResponseRequest requestSlowLog = 379 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 380 Assert.assertNotEquals(-1, 381 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 382 } 383 384 @Test 385 public void testConcurrentSlowLogEvents() throws Exception { 386 387 Configuration conf = applySlowLogRecorderConf(50000); 388 Constructor<NamedQueueRecorder> constructor = 389 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 390 constructor.setAccessible(true); 391 namedQueueRecorder = constructor.newInstance(conf); 392 AdminProtos.SlowLogResponseRequest request = 393 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 394 AdminProtos.SlowLogResponseRequest largeLogRequest = 395 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000) 396 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 397 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 398 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 399 400 for (int j = 0; j < 1000; j++) { 401 402 CompletableFuture.runAsync(() -> { 403 for (int i = 0; i < 3500; i++) { 404 RpcLogDetails rpcLogDetails = 405 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 406 namedQueueRecorder.addRecord(rpcLogDetails); 407 } 408 }); 409 410 } 411 412 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 413 414 Assert.assertNotEquals(-1, 415 HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000)); 416 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000, 417 () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); 418 } 419 420 @Test 421 public void testSlowLargeLogEvents() throws Exception { 422 Configuration conf = applySlowLogRecorderConf(28); 423 Constructor<NamedQueueRecorder> constructor = 424 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 425 constructor.setAccessible(true); 426 namedQueueRecorder = constructor.newInstance(conf); 427 428 AdminProtos.SlowLogResponseRequest request = 429 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 430 431 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 432 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 433 434 boolean isSlowLog; 435 boolean isLargeLog; 436 for (int i = 0; i < 14 * 11; i++) { 437 if (i % 2 == 0) { 438 isSlowLog = true; 439 isLargeLog = false; 440 } else { 441 isSlowLog = false; 442 isLargeLog = true; 443 } 444 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 445 "class_" + (i + 1), isSlowLog, isLargeLog); 446 namedQueueRecorder.addRecord(rpcLogDetails); 447 } 448 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 449 450 Assert.assertNotEquals(-1, 451 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 452 453 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 454 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 455 456 // confirm strict order of slow log payloads 457 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads) 458 && confirmPayloadParams(1, 151, slowLogPayloads) 459 && confirmPayloadParams(2, 149, slowLogPayloads) 460 && confirmPayloadParams(3, 147, slowLogPayloads) 461 && confirmPayloadParams(4, 145, slowLogPayloads) 462 && confirmPayloadParams(5, 143, slowLogPayloads) 463 && confirmPayloadParams(6, 141, slowLogPayloads) 464 && confirmPayloadParams(7, 139, slowLogPayloads) 465 && confirmPayloadParams(8, 137, slowLogPayloads) 466 && confirmPayloadParams(9, 135, slowLogPayloads) 467 && confirmPayloadParams(10, 133, slowLogPayloads) 468 && confirmPayloadParams(11, 131, slowLogPayloads) 469 && confirmPayloadParams(12, 129, slowLogPayloads) 470 && confirmPayloadParams(13, 127, slowLogPayloads); 471 })); 472 473 AdminProtos.SlowLogResponseRequest largeLogRequest = 474 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11) 475 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 476 477 Assert.assertNotEquals(-1, 478 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14)); 479 480 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 481 List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); 482 483 // confirm strict order of slow log payloads 484 return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads) 485 && confirmPayloadParams(1, 152, largeLogPayloads) 486 && confirmPayloadParams(2, 150, largeLogPayloads) 487 && confirmPayloadParams(3, 148, largeLogPayloads) 488 && confirmPayloadParams(4, 146, largeLogPayloads) 489 && confirmPayloadParams(5, 144, largeLogPayloads) 490 && confirmPayloadParams(6, 142, largeLogPayloads) 491 && confirmPayloadParams(7, 140, largeLogPayloads) 492 && confirmPayloadParams(8, 138, largeLogPayloads) 493 && confirmPayloadParams(9, 136, largeLogPayloads) 494 && confirmPayloadParams(10, 134, largeLogPayloads) 495 && confirmPayloadParams(11, 132, largeLogPayloads) 496 && confirmPayloadParams(12, 130, largeLogPayloads) 497 && confirmPayloadParams(13, 128, largeLogPayloads); 498 })); 499 } 500 501 @Test 502 public void testSlowLogMixedFilters() throws Exception { 503 504 Configuration conf = applySlowLogRecorderConf(30); 505 Constructor<NamedQueueRecorder> constructor = 506 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 507 constructor.setAccessible(true); 508 namedQueueRecorder = constructor.newInstance(conf); 509 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 510 .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build(); 511 512 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 513 514 for (int i = 0; i < 100; i++) { 515 RpcLogDetails rpcLogDetails = 516 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 517 namedQueueRecorder.addRecord(rpcLogDetails); 518 } 519 520 Assert.assertNotEquals(-1, 521 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2)); 522 523 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 524 .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build(); 525 Assert.assertEquals(0, getSlowLogPayloads(request2).size()); 526 527 AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() 528 .setLimit(15).setUserName("userName_87").setClientAddress("client_88") 529 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 530 Assert.assertEquals(0, getSlowLogPayloads(request3).size()); 531 532 AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() 533 .setLimit(15).setUserName("userName_87").setClientAddress("client_87") 534 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 535 Assert.assertEquals(1, getSlowLogPayloads(request4).size()); 536 537 AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() 538 .setLimit(15).setUserName("userName_88").setClientAddress("client_89") 539 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build(); 540 Assert.assertEquals(2, getSlowLogPayloads(request5).size()); 541 542 AdminProtos.SlowLogResponseRequest requestSlowLog = 543 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 544 Assert.assertNotEquals(-1, 545 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 546 } 547 548 @Test 549 public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception { 550 Configuration conf = applySlowLogRecorderConf(1); 551 conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED); 552 Constructor<NamedQueueRecorder> constructor = 553 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 554 constructor.setAccessible(true); 555 namedQueueRecorder = constructor.newInstance(conf); 556 AdminProtos.SlowLogResponseRequest request = 557 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 558 559 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 560 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 561 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 562 namedQueueRecorder.addRecord(rpcLogDetails); 563 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 564 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 565 if (slowLogPayload.isPresent()) { 566 return !slowLogPayload.get().hasScan(); 567 } 568 return false; 569 })); 570 } 571 572 @Test 573 public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception { 574 Configuration conf = applySlowLogRecorderConf(1); 575 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false); 576 Constructor<NamedQueueRecorder> constructor = 577 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 578 constructor.setAccessible(true); 579 namedQueueRecorder = constructor.newInstance(conf); 580 AdminProtos.SlowLogResponseRequest request = 581 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 582 583 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 584 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 585 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 586 namedQueueRecorder.addRecord(rpcLogDetails); 587 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 588 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 589 if (slowLogPayload.isPresent()) { 590 return !slowLogPayload.get().hasScan(); 591 } 592 return false; 593 })); 594 } 595 596 @Test 597 public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { 598 Configuration conf = applySlowLogRecorderConf(1); 599 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true); 600 Constructor<NamedQueueRecorder> constructor = 601 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 602 constructor.setAccessible(true); 603 namedQueueRecorder = constructor.newInstance(conf); 604 AdminProtos.SlowLogResponseRequest request = 605 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 606 607 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 608 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 609 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 610 namedQueueRecorder.addRecord(rpcLogDetails); 611 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 612 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 613 if (slowLogPayload.isPresent()) { 614 return slowLogPayload.get().hasScan(); 615 } 616 return false; 617 })); 618 } 619 620 @Test 621 public void testOnlineSlowLogRequestAttributes() throws Exception { 622 Configuration conf = applySlowLogRecorderConf(1); 623 Constructor<NamedQueueRecorder> constructor = 624 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 625 constructor.setAccessible(true); 626 namedQueueRecorder = constructor.newInstance(conf); 627 AdminProtos.SlowLogResponseRequest request = 628 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 629 630 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 631 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 632 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 633 namedQueueRecorder.addRecord(rpcLogDetails); 634 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 635 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 636 if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { 637 return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); 638 } 639 return false; 640 })); 641 } 642 643 @Test 644 public void testOnlineSlowLogConnectionAttributes() throws Exception { 645 Configuration conf = applySlowLogRecorderConf(1); 646 Constructor<NamedQueueRecorder> constructor = 647 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 648 constructor.setAccessible(true); 649 namedQueueRecorder = constructor.newInstance(conf); 650 AdminProtos.SlowLogResponseRequest request = 651 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 652 653 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 654 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 655 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 656 namedQueueRecorder.addRecord(rpcLogDetails); 657 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 658 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 659 if ( 660 slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty() 661 ) { 662 return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); 663 } 664 return false; 665 })); 666 } 667 668 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 669 int forcedParamIndex) { 670 RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); 671 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 672 true); 673 } 674 675 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 676 RpcCall rpcCall = getRpcCall(userName); 677 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 678 true); 679 } 680 681 private static RpcLogDetails getRpcLogDetailsOfScan() { 682 // forcedParamIndex of 0 results in a ScanRequest 683 return getRpcLogDetails("userName_1", "client_1", "class_1", 0); 684 } 685 686 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 687 boolean isSlowLog, boolean isLargeLog) { 688 RpcCall rpcCall = getRpcCall(userName); 689 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, 690 isSlowLog, isLargeLog); 691 } 692 693 private static RpcCall getRpcCall(String userName) { 694 return getRpcCall(userName, Optional.empty()); 695 } 696 697 private static RpcCall getRpcCall(String userName, int forcedParamIndex) { 698 return getRpcCall(userName, Optional.of(forcedParamIndex)); 699 } 700 701 @SuppressWarnings("checkstyle:methodlength") 702 private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) { 703 RpcCall rpcCall = new RpcCall() { 704 @Override 705 public BlockingService getService() { 706 return null; 707 } 708 709 @Override 710 public Descriptors.MethodDescriptor getMethod() { 711 return null; 712 } 713 714 @Override 715 public Message getParam() { 716 return getMessage(forcedParamIndex); 717 } 718 719 @Override 720 public CellScanner getCellScanner() { 721 return null; 722 } 723 724 @Override 725 public long getReceiveTime() { 726 return 0; 727 } 728 729 @Override 730 public long getStartTime() { 731 return 0; 732 } 733 734 @Override 735 public void setStartTime(long startTime) { 736 } 737 738 @Override 739 public int getTimeout() { 740 return 0; 741 } 742 743 @Override 744 public int getPriority() { 745 return 0; 746 } 747 748 @Override 749 public long getDeadline() { 750 return 0; 751 } 752 753 @Override 754 public long getSize() { 755 return 0; 756 } 757 758 @Override 759 public RPCProtos.RequestHeader getHeader() { 760 return null; 761 } 762 763 @Override 764 public Map<String, byte[]> getConnectionAttributes() { 765 return CONNECTION_HEADERS.stream().collect(Collectors 766 .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray())); 767 } 768 769 @Override 770 public Map<String, byte[]> getRequestAttributes() { 771 return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, 772 pair -> pair.getValue().toByteArray())); 773 } 774 775 @Override 776 public byte[] getRequestAttribute(String key) { 777 return null; 778 } 779 780 @Override 781 public int getRemotePort() { 782 return 0; 783 } 784 785 @Override 786 public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, 787 String error) { 788 } 789 790 @Override 791 public void sendResponseIfReady() throws IOException { 792 } 793 794 @Override 795 public void cleanup() { 796 } 797 798 @Override 799 public String toShortString() { 800 return null; 801 } 802 803 @Override 804 public long disconnectSince() { 805 return 0; 806 } 807 808 @Override 809 public boolean isClientCellBlockSupported() { 810 return false; 811 } 812 813 @Override 814 public Optional<User> getRequestUser() { 815 return getUser(userName); 816 } 817 818 @Override 819 public Optional<X509Certificate[]> getClientCertificateChain() { 820 return Optional.empty(); 821 } 822 823 @Override 824 public InetAddress getRemoteAddress() { 825 return null; 826 } 827 828 @Override 829 public HBaseProtos.VersionInfo getClientVersionInfo() { 830 return null; 831 } 832 833 @Override 834 public void setCallBack(RpcCallback callback) { 835 } 836 837 @Override 838 public boolean isRetryImmediatelySupported() { 839 return false; 840 } 841 842 @Override 843 public long getResponseCellSize() { 844 return 0; 845 } 846 847 @Override 848 public void incrementResponseCellSize(long cellSize) { 849 } 850 851 @Override 852 public long getBlockBytesScanned() { 853 return 0; 854 } 855 856 @Override 857 public void incrementBlockBytesScanned(long blockSize) { 858 } 859 860 @Override 861 public long getResponseExceptionSize() { 862 return 0; 863 } 864 865 @Override 866 public void incrementResponseExceptionSize(long exceptionSize) { 867 } 868 869 @Override 870 public void updateFsReadTime(long latencyMillis) { 871 872 } 873 874 @Override 875 public long getFsReadTime() { 876 return 0; 877 } 878 }; 879 return rpcCall; 880 } 881 882 private static Message getMessage(Optional<Integer> forcedParamIndex) { 883 884 i = (i + 1) % 3; 885 886 Message message = null; 887 888 switch (forcedParamIndex.orElse(i)) { 889 890 case 0: { 891 message = ClientProtos.ScanRequest.newBuilder() 892 .setRegion( 893 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1")) 894 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build()) 895 .build(); 896 break; 897 } 898 case 1: { 899 message = ClientProtos.MutateRequest.newBuilder() 900 .setRegion( 901 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 902 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 903 .setMutation(ClientProtos.MutationProto.newBuilder() 904 .setRow(ByteString.copyFromUtf8("row123")).build()) 905 .build(); 906 break; 907 } 908 case 2: { 909 message = ClientProtos.GetRequest.newBuilder() 910 .setRegion( 911 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 912 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 913 .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build()) 914 .build(); 915 break; 916 } 917 default: 918 throw new RuntimeException("Not supposed to get here?"); 919 } 920 921 return message; 922 923 } 924 925 private static Optional<User> getUser(String userName) { 926 927 return Optional.of(new User() { 928 @Override 929 public String getShortName() { 930 return userName; 931 } 932 933 @Override 934 public <T> T runAs(PrivilegedAction<T> action) { 935 return null; 936 } 937 938 @Override 939 public <T> T runAs(PrivilegedExceptionAction<T> action) { 940 return null; 941 } 942 }); 943 944 } 945 946}