001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.net.InetAddress;
023import java.security.PrivilegedAction;
024import java.security.PrivilegedExceptionAction;
025import java.security.cert.X509Certificate;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ExtendedCellScanner;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ipc.RpcCall;
039import org.apache.hadoop.hbase.ipc.RpcCallback;
040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
057import org.apache.hbase.thirdparty.com.google.protobuf.Message;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
064
065/**
066 * Tests for Online SlowLog Provider Service
067 */
068@Category({ MasterTests.class, MediumTests.class })
069public class TestNamedQueueRecorder {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
076
077  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
078  private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS =
079    ImmutableList.<HBaseProtos.NameBytesPair> builder()
080      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
081        .setValue(ByteString.copyFromUtf8("r")).build())
082      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
083        .setValue(ByteString.copyFromUtf8("h")).build())
084      .build();
085  private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS =
086    ImmutableList.<HBaseProtos.NameBytesPair> builder()
087      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
088        .setValue(ByteString.copyFromUtf8("c")).build())
089      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
090        .setValue(ByteString.copyFromUtf8("h")).build())
091      .build();
092
093  private NamedQueueRecorder namedQueueRecorder;
094
095  private static int i = 0;
096
097  private static Configuration applySlowLogRecorderConf(int eventSize) {
098    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
099    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
100    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
101    return conf;
102  }
103
104  /**
105   * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected
106   * elements
107   * @param i               index of ringbuffer logs
108   * @param j               data value that was put on index i
109   * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
110   * @return if actual values are as per expectations
111   */
112  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
113    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
114    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
115    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
116    return isClassExpected && isClientExpected && isUserExpected;
117  }
118
119  @Test
120  public void testOnlieSlowLogConsumption() throws Exception {
121
122    Configuration conf = applySlowLogRecorderConf(8);
123    Constructor<NamedQueueRecorder> constructor =
124      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
125    constructor.setAccessible(true);
126    namedQueueRecorder = constructor.newInstance(conf);
127    AdminProtos.SlowLogResponseRequest request =
128      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
129
130    namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
131    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
132    LOG.debug("Initially ringbuffer of Slow Log records is empty");
133
134    int i = 0;
135
136    // add 5 records initially
137    for (; i < 5; i++) {
138      RpcLogDetails rpcLogDetails =
139        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
140      namedQueueRecorder.addRecord(rpcLogDetails);
141    }
142
143    Assert.assertNotEquals(-1,
144      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
145    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
146    Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
147    Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
148    Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
149    Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
150    Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
151
152    // add 2 more records
153    for (; i < 7; i++) {
154      RpcLogDetails rpcLogDetails =
155        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
156      namedQueueRecorder.addRecord(rpcLogDetails);
157    }
158
159    Assert.assertNotEquals(-1,
160      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7));
161
162    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
163      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
164      return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList)
165        && confirmPayloadParams(5, 2, slowLogPayloadsList)
166        && confirmPayloadParams(6, 1, slowLogPayloadsList);
167    }));
168
169    // add 3 more records
170    for (; i < 10; i++) {
171      RpcLogDetails rpcLogDetails =
172        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
173      namedQueueRecorder.addRecord(rpcLogDetails);
174    }
175
176    Assert.assertNotEquals(-1,
177      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
178
179    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
180      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
181      // confirm ringbuffer is full
182      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList)
183        && confirmPayloadParams(0, 10, slowLogPayloadsList)
184        && confirmPayloadParams(1, 9, slowLogPayloadsList);
185    }));
186
187    // add 4 more records
188    for (; i < 14; i++) {
189      RpcLogDetails rpcLogDetails =
190        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
191      namedQueueRecorder.addRecord(rpcLogDetails);
192    }
193
194    Assert.assertNotEquals(-1,
195      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
196
197    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
198      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
199      // confirm ringbuffer is full
200      // and ordered events
201      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
202        && confirmPayloadParams(1, 13, slowLogPayloadsList)
203        && confirmPayloadParams(2, 12, slowLogPayloadsList)
204        && confirmPayloadParams(3, 11, slowLogPayloadsList);
205    }));
206
207    AdminProtos.SlowLogResponseRequest largeLogRequest =
208      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15)
209        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
210    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
211      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
212      // confirm ringbuffer is full
213      // and ordered events
214      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
215        && confirmPayloadParams(1, 13, slowLogPayloadsList)
216        && confirmPayloadParams(2, 12, slowLogPayloadsList)
217        && confirmPayloadParams(3, 11, slowLogPayloadsList);
218    }));
219
220    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
221      boolean isRingBufferCleaned =
222        namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
223
224      LOG.debug("cleared the ringbuffer of Online Slow Log records");
225
226      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
227      // confirm ringbuffer is empty
228      return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
229    }));
230
231  }
232
233  private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
234    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
235    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
236    namedQueueGetRequest.setSlowLogResponseRequest(request);
237    NamedQueueGetResponse namedQueueGetResponse =
238      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
239    return namedQueueGetResponse == null
240      ? Collections.emptyList()
241      : namedQueueGetResponse.getSlowLogPayloads();
242  }
243
244  @Test
245  public void testOnlineSlowLogWithHighRecords() throws Exception {
246
247    Configuration conf = applySlowLogRecorderConf(14);
248    Constructor<NamedQueueRecorder> constructor =
249      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
250    constructor.setAccessible(true);
251    namedQueueRecorder = constructor.newInstance(conf);
252    AdminProtos.SlowLogResponseRequest request =
253      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
254
255    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
256    LOG.debug("Initially ringbuffer of Slow Log records is empty");
257
258    for (int i = 0; i < 14 * 11; i++) {
259      RpcLogDetails rpcLogDetails =
260        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
261      namedQueueRecorder.addRecord(rpcLogDetails);
262    }
263    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
264
265    Assert.assertNotEquals(-1,
266      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
267
268    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
269      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
270
271      // confirm strict order of slow log payloads
272      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads)
273        && confirmPayloadParams(1, 153, slowLogPayloads)
274        && confirmPayloadParams(2, 152, slowLogPayloads)
275        && confirmPayloadParams(3, 151, slowLogPayloads)
276        && confirmPayloadParams(4, 150, slowLogPayloads)
277        && confirmPayloadParams(5, 149, slowLogPayloads)
278        && confirmPayloadParams(6, 148, slowLogPayloads)
279        && confirmPayloadParams(7, 147, slowLogPayloads)
280        && confirmPayloadParams(8, 146, slowLogPayloads)
281        && confirmPayloadParams(9, 145, slowLogPayloads)
282        && confirmPayloadParams(10, 144, slowLogPayloads)
283        && confirmPayloadParams(11, 143, slowLogPayloads)
284        && confirmPayloadParams(12, 142, slowLogPayloads)
285        && confirmPayloadParams(13, 141, slowLogPayloads);
286    }));
287
288    boolean isRingBufferCleaned =
289      namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
290    Assert.assertTrue(isRingBufferCleaned);
291    LOG.debug("cleared the ringbuffer of Online Slow Log records");
292    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
293
294    // confirm ringbuffer is empty
295    Assert.assertEquals(slowLogPayloads.size(), 0);
296  }
297
298  @Test
299  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
300    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
301    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
302
303    Constructor<NamedQueueRecorder> constructor =
304      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
305    constructor.setAccessible(true);
306    namedQueueRecorder = constructor.newInstance(conf);
307    AdminProtos.SlowLogResponseRequest request =
308      AdminProtos.SlowLogResponseRequest.newBuilder().build();
309    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
310    LOG.debug("Initially ringbuffer of Slow Log records is empty");
311    for (int i = 0; i < 300; i++) {
312      RpcLogDetails rpcLogDetails =
313        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
314      namedQueueRecorder.addRecord(rpcLogDetails);
315    }
316    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
317      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
318      return slowLogPayloads.size() == 0;
319    }));
320
321  }
322
323  @Test
324  public void testOnlineSlowLogWithDisableConfig() throws Exception {
325    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
326    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
327    Constructor<NamedQueueRecorder> constructor =
328      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
329    constructor.setAccessible(true);
330    namedQueueRecorder = constructor.newInstance(conf);
331
332    AdminProtos.SlowLogResponseRequest request =
333      AdminProtos.SlowLogResponseRequest.newBuilder().build();
334    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
335    LOG.debug("Initially ringbuffer of Slow Log records is empty");
336    for (int i = 0; i < 300; i++) {
337      RpcLogDetails rpcLogDetails =
338        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
339      namedQueueRecorder.addRecord(rpcLogDetails);
340    }
341    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
342      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
343      return slowLogPayloads.size() == 0;
344    }));
345    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
346  }
347
348  @Test
349  public void testSlowLogFilters() throws Exception {
350
351    Configuration conf = applySlowLogRecorderConf(30);
352    Constructor<NamedQueueRecorder> constructor =
353      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
354    constructor.setAccessible(true);
355    namedQueueRecorder = constructor.newInstance(conf);
356    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
357      .setLimit(15).setUserName("userName_87").build();
358
359    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
360
361    LOG.debug("Initially ringbuffer of Slow Log records is empty");
362
363    for (int i = 0; i < 100; i++) {
364      RpcLogDetails rpcLogDetails =
365        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
366      namedQueueRecorder.addRecord(rpcLogDetails);
367    }
368    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
369
370    Assert.assertNotEquals(-1,
371      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1));
372
373    AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest
374      .newBuilder().setLimit(15).setClientAddress("client_85").build();
375    Assert.assertNotEquals(-1,
376      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1));
377
378    AdminProtos.SlowLogResponseRequest requestSlowLog =
379      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
380    Assert.assertNotEquals(-1,
381      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
382  }
383
384  @Test
385  public void testSlowLogFilterWithClientAddress() throws Exception {
386    Configuration conf = applySlowLogRecorderConf(10);
387    Constructor<NamedQueueRecorder> constructor =
388      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
389    constructor.setAccessible(true);
390    namedQueueRecorder = constructor.newInstance(conf);
391    AdminProtos.SlowLogResponseRequest request =
392      AdminProtos.SlowLogResponseRequest.newBuilder().build();
393    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
394
395    String[] clientAddressArray = new String[] { "[127:1:1:1:1:1:1:1]:1", "[127:1:1:1:1:1:1:1]:2",
396      "[127:1:1:1:1:1:1:1]:3", "127.0.0.1:1", "127.0.0.1:2" };
397    boolean isSlowLog;
398    boolean isLargeLog;
399    for (int i = 0; i < 10; i++) {
400      if (i % 2 == 0) {
401        isSlowLog = true;
402        isLargeLog = false;
403      } else {
404        isSlowLog = false;
405        isLargeLog = true;
406      }
407      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1),
408        clientAddressArray[i % 5], "class_" + (i + 1), isSlowLog, isLargeLog);
409      namedQueueRecorder.addRecord(rpcLogDetails);
410    }
411
412    AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithPort =
413      AdminProtos.SlowLogResponseRequest.newBuilder()
414        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
415        .setClientAddress("[127:1:1:1:1:1:1:1]:2").build();
416    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
417      () -> getSlowLogPayloads(largeLogRequestIPv6WithPort).size() == 1));
418    AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithoutPort =
419      AdminProtos.SlowLogResponseRequest.newBuilder()
420        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
421        .setClientAddress("[127:1:1:1:1:1:1:1]").build();
422    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
423      () -> getSlowLogPayloads(largeLogRequestIPv6WithoutPort).size() == 3));
424    AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithPort =
425      AdminProtos.SlowLogResponseRequest.newBuilder()
426        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
427        .setClientAddress("127.0.0.1:1").build();
428    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
429      () -> getSlowLogPayloads(largeLogRequestIPv4WithPort).size() == 1));
430    AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithoutPort =
431      AdminProtos.SlowLogResponseRequest.newBuilder()
432        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
433        .setClientAddress("127.0.0.1").build();
434    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
435      () -> getSlowLogPayloads(largeLogRequestIPv4WithoutPort).size() == 2));
436  }
437
438  @Test
439  public void testConcurrentSlowLogEvents() throws Exception {
440
441    Configuration conf = applySlowLogRecorderConf(50000);
442    Constructor<NamedQueueRecorder> constructor =
443      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
444    constructor.setAccessible(true);
445    namedQueueRecorder = constructor.newInstance(conf);
446    AdminProtos.SlowLogResponseRequest request =
447      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
448    AdminProtos.SlowLogResponseRequest largeLogRequest =
449      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000)
450        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
451    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
452    LOG.debug("Initially ringbuffer of Slow Log records is empty");
453
454    for (int j = 0; j < 1000; j++) {
455
456      CompletableFuture.runAsync(() -> {
457        for (int i = 0; i < 3500; i++) {
458          RpcLogDetails rpcLogDetails =
459            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
460          namedQueueRecorder.addRecord(rpcLogDetails);
461        }
462      });
463
464    }
465
466    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
467
468    Assert.assertNotEquals(-1,
469      HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000));
470    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000,
471      () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
472  }
473
474  @Test
475  public void testSlowLargeLogEvents() throws Exception {
476    Configuration conf = applySlowLogRecorderConf(28);
477    Constructor<NamedQueueRecorder> constructor =
478      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
479    constructor.setAccessible(true);
480    namedQueueRecorder = constructor.newInstance(conf);
481
482    AdminProtos.SlowLogResponseRequest request =
483      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
484
485    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
486    LOG.debug("Initially ringbuffer of Slow Log records is empty");
487
488    boolean isSlowLog;
489    boolean isLargeLog;
490    for (int i = 0; i < 14 * 11; i++) {
491      if (i % 2 == 0) {
492        isSlowLog = true;
493        isLargeLog = false;
494      } else {
495        isSlowLog = false;
496        isLargeLog = true;
497      }
498      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1),
499        "class_" + (i + 1), isSlowLog, isLargeLog);
500      namedQueueRecorder.addRecord(rpcLogDetails);
501    }
502    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
503
504    Assert.assertNotEquals(-1,
505      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
506
507    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
508      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
509
510      // confirm strict order of slow log payloads
511      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads)
512        && confirmPayloadParams(1, 151, slowLogPayloads)
513        && confirmPayloadParams(2, 149, slowLogPayloads)
514        && confirmPayloadParams(3, 147, slowLogPayloads)
515        && confirmPayloadParams(4, 145, slowLogPayloads)
516        && confirmPayloadParams(5, 143, slowLogPayloads)
517        && confirmPayloadParams(6, 141, slowLogPayloads)
518        && confirmPayloadParams(7, 139, slowLogPayloads)
519        && confirmPayloadParams(8, 137, slowLogPayloads)
520        && confirmPayloadParams(9, 135, slowLogPayloads)
521        && confirmPayloadParams(10, 133, slowLogPayloads)
522        && confirmPayloadParams(11, 131, slowLogPayloads)
523        && confirmPayloadParams(12, 129, slowLogPayloads)
524        && confirmPayloadParams(13, 127, slowLogPayloads);
525    }));
526
527    AdminProtos.SlowLogResponseRequest largeLogRequest =
528      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11)
529        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
530
531    Assert.assertNotEquals(-1,
532      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14));
533
534    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
535      List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
536
537      // confirm strict order of slow log payloads
538      return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads)
539        && confirmPayloadParams(1, 152, largeLogPayloads)
540        && confirmPayloadParams(2, 150, largeLogPayloads)
541        && confirmPayloadParams(3, 148, largeLogPayloads)
542        && confirmPayloadParams(4, 146, largeLogPayloads)
543        && confirmPayloadParams(5, 144, largeLogPayloads)
544        && confirmPayloadParams(6, 142, largeLogPayloads)
545        && confirmPayloadParams(7, 140, largeLogPayloads)
546        && confirmPayloadParams(8, 138, largeLogPayloads)
547        && confirmPayloadParams(9, 136, largeLogPayloads)
548        && confirmPayloadParams(10, 134, largeLogPayloads)
549        && confirmPayloadParams(11, 132, largeLogPayloads)
550        && confirmPayloadParams(12, 130, largeLogPayloads)
551        && confirmPayloadParams(13, 128, largeLogPayloads);
552    }));
553  }
554
555  @Test
556  public void testSlowLogMixedFilters() throws Exception {
557
558    Configuration conf = applySlowLogRecorderConf(30);
559    Constructor<NamedQueueRecorder> constructor =
560      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
561    constructor.setAccessible(true);
562    namedQueueRecorder = constructor.newInstance(conf);
563    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
564      .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build();
565
566    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
567
568    for (int i = 0; i < 100; i++) {
569      RpcLogDetails rpcLogDetails =
570        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
571      namedQueueRecorder.addRecord(rpcLogDetails);
572    }
573
574    Assert.assertNotEquals(-1,
575      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2));
576
577    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
578      .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build();
579    Assert.assertEquals(0, getSlowLogPayloads(request2).size());
580
581    AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder()
582      .setLimit(15).setUserName("userName_87").setClientAddress("client_88")
583      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
584    Assert.assertEquals(0, getSlowLogPayloads(request3).size());
585
586    AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder()
587      .setLimit(15).setUserName("userName_87").setClientAddress("client_87")
588      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
589    Assert.assertEquals(1, getSlowLogPayloads(request4).size());
590
591    AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder()
592      .setLimit(15).setUserName("userName_88").setClientAddress("client_89")
593      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build();
594    Assert.assertEquals(2, getSlowLogPayloads(request5).size());
595
596    AdminProtos.SlowLogResponseRequest requestSlowLog =
597      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
598    Assert.assertNotEquals(-1,
599      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
600  }
601
602  @Test
603  public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception {
604    Configuration conf = applySlowLogRecorderConf(1);
605    conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED);
606    Constructor<NamedQueueRecorder> constructor =
607      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
608    constructor.setAccessible(true);
609    namedQueueRecorder = constructor.newInstance(conf);
610    AdminProtos.SlowLogResponseRequest request =
611      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
612
613    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
614    LOG.debug("Initially ringbuffer of Slow Log records is empty");
615    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
616    namedQueueRecorder.addRecord(rpcLogDetails);
617    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
618      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
619      if (slowLogPayload.isPresent()) {
620        return !slowLogPayload.get().hasScan();
621      }
622      return false;
623    }));
624  }
625
626  @Test
627  public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception {
628    Configuration conf = applySlowLogRecorderConf(1);
629    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false);
630    Constructor<NamedQueueRecorder> constructor =
631      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
632    constructor.setAccessible(true);
633    namedQueueRecorder = constructor.newInstance(conf);
634    AdminProtos.SlowLogResponseRequest request =
635      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
636
637    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
638    LOG.debug("Initially ringbuffer of Slow Log records is empty");
639    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
640    namedQueueRecorder.addRecord(rpcLogDetails);
641    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
642      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
643      if (slowLogPayload.isPresent()) {
644        return !slowLogPayload.get().hasScan();
645      }
646      return false;
647    }));
648  }
649
650  @Test
651  public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception {
652    Configuration conf = applySlowLogRecorderConf(1);
653    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true);
654    Constructor<NamedQueueRecorder> constructor =
655      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
656    constructor.setAccessible(true);
657    namedQueueRecorder = constructor.newInstance(conf);
658    AdminProtos.SlowLogResponseRequest request =
659      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
660
661    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
662    LOG.debug("Initially ringbuffer of Slow Log records is empty");
663    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
664    namedQueueRecorder.addRecord(rpcLogDetails);
665    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
666      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
667      if (slowLogPayload.isPresent()) {
668        return slowLogPayload.get().hasScan();
669      }
670      return false;
671    }));
672  }
673
674  @Test
675  public void testOnlineSlowLogRequestAttributes() throws Exception {
676    Configuration conf = applySlowLogRecorderConf(1);
677    Constructor<NamedQueueRecorder> constructor =
678      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
679    constructor.setAccessible(true);
680    namedQueueRecorder = constructor.newInstance(conf);
681    AdminProtos.SlowLogResponseRequest request =
682      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
683
684    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
685    LOG.debug("Initially ringbuffer of Slow Log records is empty");
686    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
687    namedQueueRecorder.addRecord(rpcLogDetails);
688    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
689      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
690      if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) {
691        return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS);
692      }
693      return false;
694    }));
695  }
696
697  @Test
698  public void testOnlineSlowLogConnectionAttributes() throws Exception {
699    Configuration conf = applySlowLogRecorderConf(1);
700    Constructor<NamedQueueRecorder> constructor =
701      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
702    constructor.setAccessible(true);
703    namedQueueRecorder = constructor.newInstance(conf);
704    AdminProtos.SlowLogResponseRequest request =
705      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
706
707    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
708    LOG.debug("Initially ringbuffer of Slow Log records is empty");
709    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
710    namedQueueRecorder.addRecord(rpcLogDetails);
711    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
712      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
713      if (
714        slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty()
715      ) {
716        return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS);
717      }
718      return false;
719    }));
720  }
721
722  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
723    int forcedParamIndex) {
724    RpcCall rpcCall = getRpcCall(userName, forcedParamIndex);
725    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
726      true);
727  }
728
729  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
730    RpcCall rpcCall = getRpcCall(userName);
731    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
732      true);
733  }
734
735  private static RpcLogDetails getRpcLogDetailsOfScan() {
736    // forcedParamIndex of 0 results in a ScanRequest
737    return getRpcLogDetails("userName_1", "client_1", "class_1", 0);
738  }
739
740  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
741    boolean isSlowLog, boolean isLargeLog) {
742    RpcCall rpcCall = getRpcCall(userName);
743    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className,
744      isSlowLog, isLargeLog);
745  }
746
747  private static RpcCall getRpcCall(String userName) {
748    return getRpcCall(userName, Optional.empty());
749  }
750
751  private static RpcCall getRpcCall(String userName, int forcedParamIndex) {
752    return getRpcCall(userName, Optional.of(forcedParamIndex));
753  }
754
755  @SuppressWarnings("checkstyle:methodlength")
756  private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) {
757    RpcCall rpcCall = new RpcCall() {
758      @Override
759      public BlockingService getService() {
760        return null;
761      }
762
763      @Override
764      public Descriptors.MethodDescriptor getMethod() {
765        return null;
766      }
767
768      @Override
769      public Message getParam() {
770        return getMessage(forcedParamIndex);
771      }
772
773      @Override
774      public ExtendedCellScanner getCellScanner() {
775        return null;
776      }
777
778      @Override
779      public long getReceiveTime() {
780        return 0;
781      }
782
783      @Override
784      public long getStartTime() {
785        return 0;
786      }
787
788      @Override
789      public void setStartTime(long startTime) {
790      }
791
792      @Override
793      public int getTimeout() {
794        return 0;
795      }
796
797      @Override
798      public int getPriority() {
799        return 0;
800      }
801
802      @Override
803      public long getDeadline() {
804        return 0;
805      }
806
807      @Override
808      public long getSize() {
809        return 0;
810      }
811
812      @Override
813      public RPCProtos.RequestHeader getHeader() {
814        return null;
815      }
816
817      @Override
818      public Map<String, byte[]> getConnectionAttributes() {
819        return CONNECTION_HEADERS.stream().collect(Collectors
820          .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray()));
821      }
822
823      @Override
824      public Map<String, byte[]> getRequestAttributes() {
825        return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName,
826          pair -> pair.getValue().toByteArray()));
827      }
828
829      @Override
830      public byte[] getRequestAttribute(String key) {
831        return null;
832      }
833
834      @Override
835      public int getRemotePort() {
836        return 0;
837      }
838
839      @Override
840      public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable,
841        String error) {
842      }
843
844      @Override
845      public void sendResponseIfReady() throws IOException {
846      }
847
848      @Override
849      public void cleanup() {
850      }
851
852      @Override
853      public String toShortString() {
854        return null;
855      }
856
857      @Override
858      public long disconnectSince() {
859        return 0;
860      }
861
862      @Override
863      public boolean isClientCellBlockSupported() {
864        return false;
865      }
866
867      @Override
868      public Optional<User> getRequestUser() {
869        return getUser(userName);
870      }
871
872      @Override
873      public Optional<X509Certificate[]> getClientCertificateChain() {
874        return Optional.empty();
875      }
876
877      @Override
878      public InetAddress getRemoteAddress() {
879        return null;
880      }
881
882      @Override
883      public HBaseProtos.VersionInfo getClientVersionInfo() {
884        return null;
885      }
886
887      @Override
888      public void setCallBack(RpcCallback callback) {
889      }
890
891      @Override
892      public boolean isRetryImmediatelySupported() {
893        return false;
894      }
895
896      @Override
897      public long getResponseCellSize() {
898        return 0;
899      }
900
901      @Override
902      public void incrementResponseCellSize(long cellSize) {
903      }
904
905      @Override
906      public long getBlockBytesScanned() {
907        return 0;
908      }
909
910      @Override
911      public void incrementBlockBytesScanned(long blockSize) {
912      }
913
914      @Override
915      public long getResponseExceptionSize() {
916        return 0;
917      }
918
919      @Override
920      public void incrementResponseExceptionSize(long exceptionSize) {
921      }
922
923      @Override
924      public void updateFsReadTime(long latencyMillis) {
925
926      }
927
928      @Override
929      public long getFsReadTime() {
930        return 0;
931      }
932    };
933    return rpcCall;
934  }
935
936  private static Message getMessage(Optional<Integer> forcedParamIndex) {
937
938    i = (i + 1) % 3;
939
940    Message message = null;
941
942    switch (forcedParamIndex.orElse(i)) {
943
944      case 0: {
945        message = ClientProtos.ScanRequest.newBuilder()
946          .setRegion(
947            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1"))
948              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build())
949          .build();
950        break;
951      }
952      case 1: {
953        message = ClientProtos.MutateRequest.newBuilder()
954          .setRegion(
955            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
956              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
957          .setMutation(ClientProtos.MutationProto.newBuilder()
958            .setRow(ByteString.copyFromUtf8("row123")).build())
959          .build();
960        break;
961      }
962      case 2: {
963        message = ClientProtos.GetRequest.newBuilder()
964          .setRegion(
965            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
966              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
967          .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build())
968          .build();
969        break;
970      }
971      default:
972        throw new RuntimeException("Not supposed to get here?");
973    }
974
975    return message;
976
977  }
978
979  private static Optional<User> getUser(String userName) {
980
981    return Optional.of(new User() {
982      @Override
983      public String getShortName() {
984        return userName;
985      }
986
987      @Override
988      public <T> T runAs(PrivilegedAction<T> action) {
989        return null;
990      }
991
992      @Override
993      public <T> T runAs(PrivilegedExceptionAction<T> action) {
994        return null;
995      }
996    });
997
998  }
999
1000}