001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.ConcurrentSkipListMap; 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.RegionTooBusyException; 028import org.apache.hadoop.hbase.regionserver.Region; 029import org.apache.hadoop.hbase.regionserver.Store; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ClassSize; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it 038 * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with 039 * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM 040 * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, 041 * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. 042 * <p> 043 * There are three key parameters: 044 * <p> 045 * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this 046 * threshold, the HotProtector will work, 100 by default 047 * <p> 048 * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at 049 * the same time. 050 * <p> 051 * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to prepare writing 052 * puts to a Store at the same time. 053 * <p> 054 * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and 055 * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not 056 * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or 057 * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. 058 * <p> 059 * This protector is enabled by default and could be turned off by setting 060 * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. 061 */ 062@InterfaceAudience.Private 063public class StoreHotnessProtector { 064 private static final Logger LOG = LoggerFactory.getLogger(StoreHotnessProtector.class); 065 066 private static volatile boolean loggedDisableMessage; 067 068 private volatile int parallelPutToStoreThreadLimit; 069 070 private volatile int parallelPreparePutToStoreThreadLimit; 071 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = 072 "hbase.region.store.parallel.put.limit"; 073 public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 074 "hbase.region.store.parallel.prepare.put.multiplier"; 075 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 0; 076 private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; 077 public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = 078 "hbase.region.store.parallel.put.limit.min.column.count"; 079 private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; 080 private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; 081 082 private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap = 083 new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); 084 private final Region region; 085 086 public StoreHotnessProtector(Region region, Configuration conf) { 087 init(conf); 088 this.region = region; 089 } 090 091 public void init(Configuration conf) { 092 this.parallelPutToStoreThreadLimit = 093 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); 094 this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 095 DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; 096 this.parallelPutToStoreThreadLimitCheckMinColumnCount = 097 conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 098 DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); 099 100 if (!isEnable()) { 101 logDisabledMessageOnce(); 102 } 103 } 104 105 /** 106 * {@link #init(Configuration)} is called for every Store that opens on a RegionServer. Here we 107 * make a lightweight attempt to log this message once per RegionServer, rather than per-Store. 108 * The goal is just to draw attention to this feature if debugging overload due to heavy writes. 109 */ 110 private static void logDisabledMessageOnce() { 111 if (!loggedDisableMessage) { 112 LOG.info( 113 "StoreHotnessProtector is disabled. Set {} > 0 to enable, " 114 + "which may help mitigate load under heavy write pressure.", 115 PARALLEL_PUT_STORE_THREADS_LIMIT); 116 loggedDisableMessage = true; 117 } 118 } 119 120 public void update(Configuration conf) { 121 init(conf); 122 preparePutToStoreMap.clear(); 123 LOG.debug("update config: {}", this); 124 } 125 126 public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { 127 if (!isEnable()) { 128 return; 129 } 130 131 String tooBusyStore = null; 132 boolean aboveParallelThreadLimit = false; 133 boolean aboveParallelPrePutLimit = false; 134 135 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 136 Store store = this.region.getStore(e.getKey()); 137 if (store == null || e.getValue() == null) { 138 continue; 139 } 140 141 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 142 143 // we need to try to add #preparePutCount at first because preparePutToStoreMap will be 144 // cleared when changing the configuration. 145 int preparePutCount = preparePutToStoreMap 146 .computeIfAbsent(e.getKey(), key -> new AtomicInteger()).incrementAndGet(); 147 boolean storeAboveThread = 148 store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit; 149 boolean storeAbovePrePut = preparePutCount > this.parallelPreparePutToStoreThreadLimit; 150 if (storeAboveThread || storeAbovePrePut) { 151 tooBusyStore = (tooBusyStore == null 152 ? store.getColumnFamilyName() 153 : tooBusyStore + "," + store.getColumnFamilyName()); 154 } 155 aboveParallelThreadLimit |= storeAboveThread; 156 aboveParallelPrePutLimit |= storeAbovePrePut; 157 158 if (LOG.isTraceEnabled()) { 159 LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount 160 + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); 161 } 162 } 163 } 164 165 if (aboveParallelThreadLimit || aboveParallelPrePutLimit) { 166 String msg = "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" 167 + tooBusyStore + " Above " 168 + (aboveParallelThreadLimit 169 ? "parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")" 170 : "") 171 + (aboveParallelThreadLimit && aboveParallelPrePutLimit ? " or " : "") 172 + (aboveParallelPrePutLimit 173 ? "parallelPreparePutToStoreThreadLimit(" + this.parallelPreparePutToStoreThreadLimit 174 + ")" 175 : ""); 176 LOG.trace(msg); 177 throw new RegionTooBusyException(msg); 178 } 179 } 180 181 public void finish(Map<byte[], List<Cell>> familyMaps) { 182 if (!isEnable()) { 183 return; 184 } 185 186 for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { 187 Store store = this.region.getStore(e.getKey()); 188 if (store == null || e.getValue() == null) { 189 continue; 190 } 191 if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { 192 AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); 193 // preparePutToStoreMap will be cleared when changing the configuration, so it may turn 194 // into a negative value. It will be not accuracy in a short time, it's a trade-off for 195 // performance. 196 if (counter != null && counter.decrementAndGet() < 0) { 197 counter.incrementAndGet(); 198 } 199 } 200 } 201 } 202 203 public String toString() { 204 return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" 205 + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" 206 + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" 207 + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " 208 + (this.isEnable() ? "enable" : "disable"); 209 } 210 211 public boolean isEnable() { 212 // feature is enabled when parallelPutToStoreThreadLimit > 0 213 return this.parallelPutToStoreThreadLimit > 0; 214 } 215 216 Map<byte[], AtomicInteger> getPreparePutToStoreMap() { 217 return preparePutToStoreMap; 218 } 219 220 public static final long FIXED_SIZE = 221 ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); 222}