001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import com.lmax.disruptor.BlockingWaitStrategy;
021import com.lmax.disruptor.RingBuffer;
022import com.lmax.disruptor.dsl.Disruptor;
023import com.lmax.disruptor.dsl.ProducerType;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
027import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
028import org.apache.hadoop.hbase.util.Threads;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.yetus.audience.InterfaceStability;
031
032import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
034
035/**
036 * NamedQueue recorder that maintains various named queues. The service uses LMAX Disruptor to save
037 * queue records which are then consumed by a queue and based on the ring buffer size, the available
038 * records are then fetched from the queue in thread-safe manner.
039 */
040@InterfaceAudience.Private
041@InterfaceStability.Evolving
042public class NamedQueueRecorder {
043
044  private final Disruptor<RingBufferEnvelope> disruptor;
045  private final LogEventHandler logEventHandler;
046
047  private static volatile NamedQueueRecorder namedQueueRecorder;
048  private static boolean isInit = false;
049  private static final Object LOCK = new Object();
050
051  /**
052   * Initialize disruptor with configurable ringbuffer size
053   */
054  private NamedQueueRecorder(Configuration conf) {
055
056    // This is the 'writer' -- a single threaded executor. This single thread consumes what is
057    // put on the ringbuffer.
058    final String hostingThreadName = Thread.currentThread().getName();
059
060    int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
061
062    // disruptor initialization with BlockingWaitStrategy
063    this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
064      new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d")
065        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
066      ProducerType.MULTI, new BlockingWaitStrategy());
067    this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
068
069    // initialize ringbuffer event handler
070    this.logEventHandler = new LogEventHandler(conf);
071    this.disruptor.handleEventsWith(new LogEventHandler[] { this.logEventHandler });
072    this.disruptor.start();
073  }
074
075  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_EXPOSE_REP",
076      justification = "singleton pattern")
077  public static NamedQueueRecorder getInstance(Configuration conf) {
078    if (namedQueueRecorder != null) {
079      return namedQueueRecorder;
080    }
081    synchronized (LOCK) {
082      if (!isInit) {
083        namedQueueRecorder = new NamedQueueRecorder(conf);
084        isInit = true;
085      }
086    }
087    return namedQueueRecorder;
088  }
089
090  // must be power of 2 for disruptor ringbuffer
091  private int getEventCount(int eventCount) {
092    Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
093    int floor = Integer.highestOneBit(eventCount);
094    if (floor == eventCount) {
095      return floor;
096    }
097    // max capacity is 1 << 30
098    if (floor >= 1 << 29) {
099      return 1 << 30;
100    }
101    return floor << 1;
102  }
103
104  /**
105   * Retrieve in memory queue records from ringbuffer
106   * @param request namedQueue request with event type
107   * @return queue records from ringbuffer after filter (if applied)
108   */
109  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
110    return this.logEventHandler.getNamedQueueRecords(request);
111  }
112
113  /**
114   * clears queue records from ringbuffer
115   * @param namedQueueEvent type of queue to clear
116   * @return true if slow log payloads are cleaned up or hbase.regionserver.slowlog.buffer.enabled
117   *         is not set to true, false if failed to clean up slow logs
118   */
119  public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
120    return this.logEventHandler.clearNamedQueue(namedQueueEvent);
121  }
122
123  /**
124   * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
125   * consumer of disruptor ringbuffer will have specific logic. This method is producer of disruptor
126   * ringbuffer which is initialized in NamedQueueRecorder constructor.
127   * @param namedQueuePayload namedQueue payload sent by client of ring buffer service
128   */
129  public void addRecord(NamedQueuePayload namedQueuePayload) {
130    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
131    long seqId = ringBuffer.next();
132    try {
133      ringBuffer.get(seqId).load(namedQueuePayload);
134    } finally {
135      ringBuffer.publish(seqId);
136    }
137  }
138
139  /**
140   * Add all in memory queue records to system table. The implementors can use system table or
141   * direct HDFS file or ZK as persistence system.
142   */
143  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
144    if (this.logEventHandler != null) {
145      this.logEventHandler.persistAll(namedQueueEvent, connection);
146    }
147  }
148}