001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ipc.PriorityFunction;
026import org.apache.hadoop.hbase.ipc.QosPriority;
027import org.apache.hadoop.hbase.security.User;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.protobuf.Message;
033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
046
047/**
048 * Reads special method annotations and table names to figure a priority for use by QoS facility in
049 * ipc; e.g: rpcs to hbase:meta get priority.
050 */
051// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
052// suggests and just have the client specify a priority.
053
054// The logic for figuring out high priority RPCs is as follows:
055// 1. if the method is annotated with a QosPriority of QOS_HIGH,
056// that is honored
057// 2. parse out the protobuf message and see if the request is for meta
058// region, and if so, treat it as a high priority RPC
059// Some optimizations for (2) are done here -
060// Clients send the argument classname as part of making the RPC. The server
061// decides whether to deserialize the proto argument message based on the
062// pre-established set of argument classes (knownArgumentClasses below).
063// This prevents the server from having to deserialize all proto argument
064// messages prematurely.
065// All the argument classes declare a 'getRegion' method that returns a
066// RegionSpecifier object. Methods can be invoked on the returned object
067// to figure out whether it is a meta region or not.
068@InterfaceAudience.Private
069public class AnnotationReadingPriorityFunction implements PriorityFunction {
070  private static final Logger LOG =
071    LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName());
072
073  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
074  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
075
076  protected final Map<String, Integer> annotatedQos;
077  // We need to mock the regionserver instance for some unit tests (set via
078  // setRegionServer method.
079  private RSRpcServices rpcServices;
080  @SuppressWarnings("unchecked")
081  private final Class<? extends Message>[] knownArgumentClasses =
082    new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
083      FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class,
084      ScanRequest.class };
085
086  // Some caches for helping performance
087  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
088  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
089
090  private final float scanVirtualTimeWeight;
091
092  /**
093   * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
094   * {@code rpcServices#getClass()} The RPC server implementation
095   */
096  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
097    this(rpcServices, rpcServices.getClass());
098  }
099
100  /**
101   * Constructs the priority function given the RPC server implementation and the annotations on the
102   * methods in the provided {@code clz}. The RPC server implementation The concrete RPC server
103   * implementation's class
104   */
105  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
106    Class<? extends RSRpcServices> clz) {
107    Map<String, Integer> qosMap = new HashMap<>();
108    for (Method m : clz.getMethods()) {
109      QosPriority p = m.getAnnotation(QosPriority.class);
110      if (p != null) {
111        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
112        // are capitalized. This meant that this brittle compare of method names gotten by
113        // reflection no longer matched the method names coming in over pb. TODO: Get rid of this
114        // check. For now, workaround is to capitalize the names we got from reflection so they
115        // have chance of matching the pb ones.
116        String capitalizedMethodName = capitalize(m.getName());
117        qosMap.put(capitalizedMethodName, p.priority());
118      }
119    }
120    this.rpcServices = rpcServices;
121    this.annotatedQos = qosMap;
122    if (methodMap.get("getRegion") == null) {
123      methodMap.put("hasRegion", new HashMap<>());
124      methodMap.put("getRegion", new HashMap<>());
125    }
126    for (Class<? extends Message> cls : knownArgumentClasses) {
127      argumentToClassMap.put(cls.getName(), cls);
128      try {
129        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
130        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
131      } catch (Exception e) {
132        throw new RuntimeException(e);
133      }
134    }
135
136    Configuration conf = rpcServices.getConfiguration();
137    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
138  }
139
140  private String capitalize(final String s) {
141    StringBuilder strBuilder = new StringBuilder(s);
142    strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
143    return strBuilder.toString();
144  }
145
146  /**
147   * Returns a 'priority' based on the request type. Currently the returned priority is used for
148   * queue selection. See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
149   * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), NORMAL_QOS (user requests).
150   */
151  @Override
152  public int getPriority(RequestHeader header, Message param, User user) {
153    int priorityByAnnotation = getAnnotatedPriority(header);
154
155    if (priorityByAnnotation >= 0) {
156      return priorityByAnnotation;
157    }
158    return getBasePriority(header, param);
159  }
160
161  /**
162   * See if the method has an annotation.
163   * @return Return the priority from the annotation. If there isn't an annotation, this returns
164   *         something below zero.
165   */
166  protected int getAnnotatedPriority(RequestHeader header) {
167    String methodName = header.getMethodName();
168    Integer priorityByAnnotation = annotatedQos.get(methodName);
169    if (priorityByAnnotation != null) {
170      return priorityByAnnotation;
171    }
172    return -1;
173  }
174
175  /**
176   * Get the priority for a given request from the header and the param This doesn't consider which
177   * user is sending the request at all. This doesn't consider annotations
178   */
179  protected int getBasePriority(RequestHeader header, Message param) {
180    if (param == null) {
181      return HConstants.NORMAL_QOS;
182    }
183
184    // Trust the client-set priorities if set
185    if (header.hasPriority()) {
186      return header.getPriority();
187    }
188    if (param instanceof BulkLoadHFileRequest) {
189      return HConstants.BULKLOAD_QOS;
190    }
191
192    String cls = param.getClass().getName();
193    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
194    RegionSpecifier regionSpecifier = null;
195    // check whether the request has reference to meta region or now.
196    try {
197      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
198      // hasRegion returns true. Not all listed methods have region specifier each time. For
199      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
200      // send the region over every time.
201      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
202      if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) {
203        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
204        regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null);
205        Region region = rpcServices.getRegion(regionSpecifier);
206        if (region.getRegionInfo().getTable().isSystemTable()) {
207          if (LOG.isTraceEnabled()) {
208            LOG.trace(
209              "High priority because region=" + region.getRegionInfo().getRegionNameAsString());
210          }
211          return HConstants.SYSTEMTABLE_QOS;
212        }
213      }
214    } catch (Exception ex) {
215      // Not good throwing an exception out of here, a runtime anyways. Let the query go into the
216      // server and have it throw the exception if still an issue. Just mark it normal priority.
217      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
218      return HConstants.NORMAL_QOS;
219    }
220
221    if (param instanceof ScanRequest) { // scanner methods...
222      ScanRequest request = (ScanRequest) param;
223      if (!request.hasScannerId()) {
224        return HConstants.NORMAL_QOS;
225      }
226      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
227      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
228        if (LOG.isTraceEnabled()) {
229          // Scanner requests are small in size so TextFormat version should not overwhelm log.
230          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
231        }
232        return HConstants.SYSTEMTABLE_QOS;
233      }
234    }
235
236    return HConstants.NORMAL_QOS;
237  }
238
239  /**
240   * Based on the request content, returns the deadline of the request.
241   * @return Deadline of this request. 0 now, otherwise msec of 'delay'
242   */
243  @Override
244  public long getDeadline(RequestHeader header, Message param) {
245    if (param instanceof ScanRequest) {
246      ScanRequest request = (ScanRequest) param;
247      if (!request.hasScannerId()) {
248        return 0;
249      }
250
251      // get the 'virtual time' of the scanner, and applies sqrt() to get a
252      // nice curve for the delay. More a scanner is used the less priority it gets.
253      // The weight is used to have more control on the delay.
254      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
255      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
256    }
257    return 0;
258  }
259
260  void setRegionServer(final HRegionServer hrs) {
261    this.rpcServices = hrs.getRSRpcServices();
262  }
263}