001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import com.lmax.disruptor.BlockingWaitStrategy; 021import com.lmax.disruptor.RingBuffer; 022import com.lmax.disruptor.dsl.Disruptor; 023import com.lmax.disruptor.dsl.ProducerType; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 027import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 028import org.apache.hadoop.hbase.util.Threads; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.yetus.audience.InterfaceStability; 031 032import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034 035/** 036 * NamedQueue recorder that maintains various named queues. The service uses LMAX Disruptor to save 037 * queue records which are then consumed by a queue and based on the ring buffer size, the available 038 * records are then fetched from the queue in thread-safe manner. 039 */ 040@InterfaceAudience.Private 041@InterfaceStability.Evolving 042public class NamedQueueRecorder { 043 044 private final Disruptor<RingBufferEnvelope> disruptor; 045 private final LogEventHandler logEventHandler; 046 047 private static volatile NamedQueueRecorder namedQueueRecorder; 048 private static boolean isInit = false; 049 private static final Object LOCK = new Object(); 050 051 /** 052 * Initialize disruptor with configurable ringbuffer size 053 */ 054 private NamedQueueRecorder(Configuration conf) { 055 056 // This is the 'writer' -- a single threaded executor. This single thread consumes what is 057 // put on the ringbuffer. 058 final String hostingThreadName = Thread.currentThread().getName(); 059 060 int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); 061 062 // disruptor initialization with BlockingWaitStrategy 063 this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), 064 new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d") 065 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 066 ProducerType.MULTI, new BlockingWaitStrategy()); 067 this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); 068 069 // initialize ringbuffer event handler 070 this.logEventHandler = new LogEventHandler(conf); 071 this.disruptor.handleEventsWith(new LogEventHandler[] { this.logEventHandler }); 072 this.disruptor.start(); 073 } 074 075 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_EXPOSE_REP", 076 justification = "singleton pattern") 077 public static NamedQueueRecorder getInstance(Configuration conf) { 078 if (namedQueueRecorder != null) { 079 return namedQueueRecorder; 080 } 081 synchronized (LOCK) { 082 if (!isInit) { 083 namedQueueRecorder = new NamedQueueRecorder(conf); 084 isInit = true; 085 } 086 } 087 return namedQueueRecorder; 088 } 089 090 // must be power of 2 for disruptor ringbuffer 091 private int getEventCount(int eventCount) { 092 Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0"); 093 int floor = Integer.highestOneBit(eventCount); 094 if (floor == eventCount) { 095 return floor; 096 } 097 // max capacity is 1 << 30 098 if (floor >= 1 << 29) { 099 return 1 << 30; 100 } 101 return floor << 1; 102 } 103 104 /** 105 * Retrieve in memory queue records from ringbuffer 106 * @param request namedQueue request with event type 107 * @return queue records from ringbuffer after filter (if applied) 108 */ 109 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 110 return this.logEventHandler.getNamedQueueRecords(request); 111 } 112 113 /** 114 * clears queue records from ringbuffer 115 * @param namedQueueEvent type of queue to clear 116 * @return true if slow log payloads are cleaned up or hbase.regionserver.slowlog.buffer.enabled 117 * is not set to true, false if failed to clean up slow logs 118 */ 119 public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 120 return this.logEventHandler.clearNamedQueue(namedQueueEvent); 121 } 122 123 /** 124 * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog), 125 * consumer of disruptor ringbuffer will have specific logic. This method is producer of disruptor 126 * ringbuffer which is initialized in NamedQueueRecorder constructor. 127 * @param namedQueuePayload namedQueue payload sent by client of ring buffer service 128 */ 129 public void addRecord(NamedQueuePayload namedQueuePayload) { 130 RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); 131 long seqId = ringBuffer.next(); 132 try { 133 ringBuffer.get(seqId).load(namedQueuePayload); 134 } finally { 135 ringBuffer.publish(seqId); 136 } 137 } 138 139 /** 140 * Add all in memory queue records to system table. The implementors can use system table or 141 * direct HDFS file or ZK as persistence system. 142 */ 143 public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { 144 if (this.logEventHandler != null) { 145 this.logEventHandler.persistAll(namedQueueEvent, connection); 146 } 147 } 148}