001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.lang.reflect.Method; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.ipc.PriorityFunction; 026import org.apache.hadoop.hbase.ipc.QosPriority; 027import org.apache.hadoop.hbase.security.User; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.protobuf.Message; 033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 046 047/** 048 * Reads special method annotations and table names to figure a priority for use by QoS facility in 049 * ipc; e.g: rpcs to hbase:meta get priority. 050 */ 051// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott 052// suggests and just have the client specify a priority. 053 054// The logic for figuring out high priority RPCs is as follows: 055// 1. if the method is annotated with a QosPriority of QOS_HIGH, 056// that is honored 057// 2. parse out the protobuf message and see if the request is for meta 058// region, and if so, treat it as a high priority RPC 059// Some optimizations for (2) are done here - 060// Clients send the argument classname as part of making the RPC. The server 061// decides whether to deserialize the proto argument message based on the 062// pre-established set of argument classes (knownArgumentClasses below). 063// This prevents the server from having to deserialize all proto argument 064// messages prematurely. 065// All the argument classes declare a 'getRegion' method that returns a 066// RegionSpecifier object. Methods can be invoked on the returned object 067// to figure out whether it is a meta region or not. 068@InterfaceAudience.Private 069public class AnnotationReadingPriorityFunction implements PriorityFunction { 070 private static final Logger LOG = 071 LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName()); 072 073 /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ 074 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; 075 076 protected final Map<String, Integer> annotatedQos; 077 // We need to mock the regionserver instance for some unit tests (set via 078 // setRegionServer method. 079 private RSRpcServices rpcServices; 080 @SuppressWarnings("unchecked") 081 private final Class<? extends Message>[] knownArgumentClasses = 082 new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class, 083 FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class, 084 ScanRequest.class }; 085 086 // Some caches for helping performance 087 private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>(); 088 private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>(); 089 090 private final float scanVirtualTimeWeight; 091 092 /** 093 * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of 094 * {@code rpcServices#getClass()} The RPC server implementation 095 */ 096 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { 097 this(rpcServices, rpcServices.getClass()); 098 } 099 100 /** 101 * Constructs the priority function given the RPC server implementation and the annotations on the 102 * methods in the provided {@code clz}. The RPC server implementation The concrete RPC server 103 * implementation's class 104 */ 105 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices, 106 Class<? extends RSRpcServices> clz) { 107 Map<String, Integer> qosMap = new HashMap<>(); 108 for (Method m : clz.getMethods()) { 109 QosPriority p = m.getAnnotation(QosPriority.class); 110 if (p != null) { 111 // Since we protobuf'd, and then subsequently, when we went with pb style, method names 112 // are capitalized. This meant that this brittle compare of method names gotten by 113 // reflection no longer matched the method names coming in over pb. TODO: Get rid of this 114 // check. For now, workaround is to capitalize the names we got from reflection so they 115 // have chance of matching the pb ones. 116 String capitalizedMethodName = capitalize(m.getName()); 117 qosMap.put(capitalizedMethodName, p.priority()); 118 } 119 } 120 this.rpcServices = rpcServices; 121 this.annotatedQos = qosMap; 122 if (methodMap.get("getRegion") == null) { 123 methodMap.put("hasRegion", new HashMap<>()); 124 methodMap.put("getRegion", new HashMap<>()); 125 } 126 for (Class<? extends Message> cls : knownArgumentClasses) { 127 argumentToClassMap.put(cls.getName(), cls); 128 try { 129 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion")); 130 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion")); 131 } catch (Exception e) { 132 throw new RuntimeException(e); 133 } 134 } 135 136 Configuration conf = rpcServices.getConfiguration(); 137 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); 138 } 139 140 private String capitalize(final String s) { 141 StringBuilder strBuilder = new StringBuilder(s); 142 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0))); 143 return strBuilder.toString(); 144 } 145 146 /** 147 * Returns a 'priority' based on the request type. Currently the returned priority is used for 148 * queue selection. See the SimpleRpcScheduler as example. It maintains a queue per 'priory type' 149 * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), NORMAL_QOS (user requests). 150 */ 151 @Override 152 public int getPriority(RequestHeader header, Message param, User user) { 153 int priorityByAnnotation = getAnnotatedPriority(header); 154 155 if (priorityByAnnotation >= 0) { 156 return priorityByAnnotation; 157 } 158 return getBasePriority(header, param); 159 } 160 161 /** 162 * See if the method has an annotation. 163 * @return Return the priority from the annotation. If there isn't an annotation, this returns 164 * something below zero. 165 */ 166 protected int getAnnotatedPriority(RequestHeader header) { 167 String methodName = header.getMethodName(); 168 Integer priorityByAnnotation = annotatedQos.get(methodName); 169 if (priorityByAnnotation != null) { 170 return priorityByAnnotation; 171 } 172 return -1; 173 } 174 175 /** 176 * Get the priority for a given request from the header and the param This doesn't consider which 177 * user is sending the request at all. This doesn't consider annotations 178 */ 179 protected int getBasePriority(RequestHeader header, Message param) { 180 if (param == null) { 181 return HConstants.NORMAL_QOS; 182 } 183 184 // Trust the client-set priorities if set 185 if (header.hasPriority()) { 186 return header.getPriority(); 187 } 188 if (param instanceof BulkLoadHFileRequest) { 189 return HConstants.BULKLOAD_QOS; 190 } 191 192 String cls = param.getClass().getName(); 193 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); 194 RegionSpecifier regionSpecifier = null; 195 // check whether the request has reference to meta region or now. 196 try { 197 // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if 198 // hasRegion returns true. Not all listed methods have region specifier each time. For 199 // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than 200 // send the region over every time. 201 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass); 202 if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) { 203 Method getRegion = methodMap.get("getRegion").get(rpcArgClass); 204 regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null); 205 Region region = rpcServices.getRegion(regionSpecifier); 206 if (region.getRegionInfo().getTable().isSystemTable()) { 207 if (LOG.isTraceEnabled()) { 208 LOG.trace( 209 "High priority because region=" + region.getRegionInfo().getRegionNameAsString()); 210 } 211 return HConstants.SYSTEMTABLE_QOS; 212 } 213 } 214 } catch (Exception ex) { 215 // Not good throwing an exception out of here, a runtime anyways. Let the query go into the 216 // server and have it throw the exception if still an issue. Just mark it normal priority. 217 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex); 218 return HConstants.NORMAL_QOS; 219 } 220 221 if (param instanceof ScanRequest) { // scanner methods... 222 ScanRequest request = (ScanRequest) param; 223 if (!request.hasScannerId()) { 224 return HConstants.NORMAL_QOS; 225 } 226 RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); 227 if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) { 228 if (LOG.isTraceEnabled()) { 229 // Scanner requests are small in size so TextFormat version should not overwhelm log. 230 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request)); 231 } 232 return HConstants.SYSTEMTABLE_QOS; 233 } 234 } 235 236 return HConstants.NORMAL_QOS; 237 } 238 239 /** 240 * Based on the request content, returns the deadline of the request. 241 * @return Deadline of this request. 0 now, otherwise msec of 'delay' 242 */ 243 @Override 244 public long getDeadline(RequestHeader header, Message param) { 245 if (param instanceof ScanRequest) { 246 ScanRequest request = (ScanRequest) param; 247 if (!request.hasScannerId()) { 248 return 0; 249 } 250 251 // get the 'virtual time' of the scanner, and applies sqrt() to get a 252 // nice curve for the delay. More a scanner is used the less priority it gets. 253 // The weight is used to have more control on the delay. 254 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); 255 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); 256 } 257 return 0; 258 } 259 260 void setRegionServer(final HRegionServer hrs) { 261 this.rpcServices = hrs.getRSRpcServices(); 262 } 263}