001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.Queue; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.BalancerRejection; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 029import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; 030import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 031import org.apache.hadoop.hbase.namequeues.NamedQueueService; 032import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 033import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 039import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; 042 043/** 044 * In-memory Queue service provider for Balancer Rejection events 045 */ 046@InterfaceAudience.Private 047public class BalancerRejectionQueueService implements NamedQueueService { 048 049 private static final Logger LOG = LoggerFactory.getLogger(BalancerRejectionQueueService.class); 050 051 private final boolean isBalancerRejectionRecording; 052 private static final String BALANCER_REJECTION_QUEUE_SIZE = 053 "hbase.master.balancer.rejection.queue.size"; 054 private static final int DEFAULT_BALANCER_REJECTION_QUEUE_SIZE = 250; 055 056 private final Queue<RecentLogs.BalancerRejection> balancerRejectionQueue; 057 058 public BalancerRejectionQueueService(Configuration conf) { 059 isBalancerRejectionRecording = 060 conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, 061 BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); 062 if (!isBalancerRejectionRecording) { 063 balancerRejectionQueue = null; 064 return; 065 } 066 final int queueSize = 067 conf.getInt(BALANCER_REJECTION_QUEUE_SIZE, DEFAULT_BALANCER_REJECTION_QUEUE_SIZE); 068 final EvictingQueue<RecentLogs.BalancerRejection> evictingQueue = 069 EvictingQueue.create(queueSize); 070 balancerRejectionQueue = Queues.synchronizedQueue(evictingQueue); 071 } 072 073 @Override 074 public NamedQueuePayload.NamedQueueEvent getEvent() { 075 return NamedQueuePayload.NamedQueueEvent.BALANCE_REJECTION; 076 } 077 078 @Override 079 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 080 if (!isBalancerRejectionRecording) { 081 return; 082 } 083 if (!(namedQueuePayload instanceof BalancerRejectionDetails)) { 084 LOG.warn("BalancerRejectionQueueService: NamedQueuePayload is not of type" 085 + " BalancerRejectionDetails."); 086 return; 087 } 088 BalancerRejectionDetails balancerRejectionDetails = 089 (BalancerRejectionDetails) namedQueuePayload; 090 BalancerRejection balancerRejectionRecord = balancerRejectionDetails.getBalancerRejection(); 091 RecentLogs.BalancerRejection BalancerRejection = 092 RecentLogs.BalancerRejection.newBuilder().setReason(balancerRejectionRecord.getReason()) 093 .addAllCostFuncInfo(balancerRejectionRecord.getCostFuncInfoList()).build(); 094 balancerRejectionQueue.add(BalancerRejection); 095 } 096 097 @Override 098 public boolean clearNamedQueue() { 099 if (!isBalancerRejectionRecording) { 100 return false; 101 } 102 LOG.debug("Received request to clean up balancer rejection queue."); 103 balancerRejectionQueue.clear(); 104 return true; 105 } 106 107 @Override 108 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 109 if (!isBalancerRejectionRecording) { 110 return null; 111 } 112 List<RecentLogs.BalancerRejection> balancerRejections = 113 Arrays.stream(balancerRejectionQueue.toArray(new RecentLogs.BalancerRejection[0])) 114 .collect(Collectors.toList()); 115 // latest records should be displayed first, hence reverse order sorting 116 Collections.reverse(balancerRejections); 117 int limit = balancerRejections.size(); 118 if (request.getBalancerRejectionsRequest().hasLimit()) { 119 limit = 120 Math.min(request.getBalancerRejectionsRequest().getLimit(), balancerRejections.size()); 121 } 122 // filter limit if provided 123 balancerRejections = balancerRejections.subList(0, limit); 124 final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); 125 namedQueueGetResponse.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT); 126 namedQueueGetResponse.setBalancerRejections(balancerRejections); 127 return namedQueueGetResponse; 128 } 129 130 @Override 131 public void persistAll(Connection connection) { 132 // no-op for now 133 } 134 135}