001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.ConcurrentMap; 022import org.apache.hadoop.conf.Configured; 023import org.apache.hadoop.hbase.HBaseInterfaceAudience; 024import org.apache.hadoop.hbase.Stoppable; 025import org.apache.hadoop.hbase.regionserver.RegionServerServices; 026import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 033public abstract class PressureAwareThroughputController extends Configured 034 implements ThroughputController, Stoppable { 035 private static final Logger LOG = 036 LoggerFactory.getLogger(PressureAwareThroughputController.class); 037 038 /** 039 * Stores the information of one controlled compaction. 040 */ 041 private static final class ActiveOperation { 042 043 private final long startTime; 044 045 private long lastControlTime; 046 047 private long lastControlSize; 048 049 private long totalSize; 050 051 private long numberOfSleeps; 052 053 private long totalSleepTime; 054 055 // prevent too many debug log 056 private long lastLogTime; 057 058 ActiveOperation() { 059 long currentTime = EnvironmentEdgeManager.currentTime(); 060 this.startTime = currentTime; 061 this.lastControlTime = currentTime; 062 this.lastLogTime = currentTime; 063 } 064 } 065 066 protected long maxThroughputUpperBound; 067 068 protected long maxThroughputLowerBound; 069 070 protected OffPeakHours offPeakHours; 071 072 protected long controlPerSize; 073 074 protected int tuningPeriod; 075 076 private volatile double maxThroughput; 077 private volatile double maxThroughputPerOperation; 078 079 protected final ConcurrentMap<String, ActiveOperation> activeOperations = 080 new ConcurrentHashMap<>(); 081 082 @Override 083 public abstract void setup(final RegionServerServices server); 084 085 protected String throughputDesc(long deltaSize, long elapsedTime) { 086 return throughputDesc((double) deltaSize / elapsedTime * 1000); 087 } 088 089 protected String throughputDesc(double speed) { 090 if (speed >= 1E15) { // large enough to say it is unlimited 091 return "unlimited"; 092 } else { 093 return String.format("%.2f MB/second", speed / 1024 / 1024); 094 } 095 } 096 097 @Override 098 public void start(String opName) { 099 activeOperations.put(opName, new ActiveOperation()); 100 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 101 } 102 103 @Override 104 public long control(String opName, long size) throws InterruptedException { 105 ActiveOperation operation = activeOperations.get(opName); 106 operation.totalSize += size; 107 long deltaSize = operation.totalSize - operation.lastControlSize; 108 if (deltaSize < controlPerSize) { 109 return 0; 110 } 111 long now = EnvironmentEdgeManager.currentTime(); 112 long minTimeAllowed = (long) (deltaSize / maxThroughputPerOperation * 1000); // ms 113 long elapsedTime = now - operation.lastControlTime; 114 operation.lastControlSize = operation.totalSize; 115 if (elapsedTime >= minTimeAllowed) { 116 operation.lastControlTime = EnvironmentEdgeManager.currentTime(); 117 return 0; 118 } 119 // too fast 120 long sleepTime = minTimeAllowed - elapsedTime; 121 if (LOG.isDebugEnabled()) { 122 // do not log too much 123 if (now - operation.lastLogTime > 5L * 1000) { 124 LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns"); 125 LOG.debug(opName + " sleep=" + sleepTime + "ms because current throughput is " 126 + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " 127 + throughputDesc(maxThroughputPerOperation) + ", already slept " 128 + operation.numberOfSleeps + " time(s) and total slept time is " 129 + operation.totalSleepTime + " ms till now."); 130 operation.lastLogTime = now; 131 } 132 } 133 Thread.sleep(sleepTime); 134 operation.numberOfSleeps++; 135 operation.totalSleepTime += sleepTime; 136 operation.lastControlTime = EnvironmentEdgeManager.currentTime(); 137 return sleepTime; 138 } 139 140 @Override 141 public void finish(String opName) { 142 ActiveOperation operation = activeOperations.remove(opName); 143 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 144 long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime; 145 LOG.info(opName + " average throughput is " + throughputDesc(operation.totalSize, elapsedTime) 146 + ", slept " + operation.numberOfSleeps + " time(s) and total slept time is " 147 + operation.totalSleepTime + " ms. " + activeOperations.size() 148 + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput())); 149 } 150 151 private volatile boolean stopped = false; 152 153 @Override 154 public void stop(String why) { 155 stopped = true; 156 } 157 158 @Override 159 public boolean isStopped() { 160 return stopped; 161 } 162 163 public double getMaxThroughput() { 164 return maxThroughput; 165 } 166 167 public void setMaxThroughput(double maxThroughput) { 168 this.maxThroughput = maxThroughput; 169 maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); 170 } 171}