001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.hbase.CellScannable; 026import org.apache.hadoop.hbase.CellScanner; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 034 035/** 036 * Get instances via {@link RpcControllerFactory} on client-side. 037 * @see RpcControllerFactory 038 */ 039@InterfaceAudience.Private 040public class HBaseRpcControllerImpl implements HBaseRpcController { 041 /** 042 * The time, in ms before the call should expire. 043 */ 044 private Integer callTimeout; 045 046 private boolean done = false; 047 048 private boolean cancelled = false; 049 050 private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); 051 052 private IOException exception; 053 054 private TableName tableName; 055 056 /** 057 * Rpc target Region's RegionInfo we are going against. May be null. 058 * @see #hasRegionInfo() 059 */ 060 private RegionInfo regionInfo; 061 062 /** 063 * Priority to set on this request. Set it here in controller so available composing the request. 064 * This is the ordained way of setting priorities going forward. We will be undoing the old 065 * annotation-based mechanism. 066 */ 067 private int priority = HConstants.PRIORITY_UNSET; 068 069 /** 070 * They are optionally set on construction, cleared after we make the call, and then optionally 071 * set on response with the result. We use this lowest common denominator access to Cells because 072 * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded 073 * block that implements CellScanner. 074 */ 075 private CellScanner cellScanner; 076 077 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 078 079 public HBaseRpcControllerImpl() { 080 this(null, (CellScanner) null); 081 } 082 083 /** 084 * Used server-side. Clients should go via {@link RpcControllerFactory} 085 */ 086 public HBaseRpcControllerImpl(final CellScanner cellScanner) { 087 this(null, cellScanner); 088 } 089 090 HBaseRpcControllerImpl(RegionInfo regionInfo, final CellScanner cellScanner) { 091 this.cellScanner = cellScanner; 092 this.regionInfo = regionInfo; 093 } 094 095 HBaseRpcControllerImpl(RegionInfo regionInfo, final List<CellScannable> cellIterables) { 096 this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables); 097 this.regionInfo = null; 098 } 099 100 @Override 101 public boolean hasRegionInfo() { 102 return this.regionInfo != null; 103 } 104 105 @Override 106 public RegionInfo getRegionInfo() { 107 return this.regionInfo; 108 } 109 110 /** Returns One-shot cell scanner (you cannot back it up and restart) */ 111 @Override 112 public CellScanner cellScanner() { 113 return cellScanner; 114 } 115 116 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 117 justification = "The only possible race method is startCancel") 118 @Override 119 public void setCellScanner(final CellScanner cellScanner) { 120 this.cellScanner = cellScanner; 121 } 122 123 @Override 124 public void setPriority(int priority) { 125 this.priority = Math.max(this.priority, priority); 126 127 } 128 129 @Override 130 public void setPriority(final TableName tn) { 131 setPriority( 132 tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); 133 } 134 135 @Override 136 public int getPriority() { 137 return priority < 0 ? HConstants.NORMAL_QOS : priority; 138 } 139 140 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 141 justification = "The only possible race method is startCancel") 142 @Override 143 public void reset() { 144 priority = 0; 145 cellScanner = null; 146 exception = null; 147 callTimeout = null; 148 regionInfo = null; 149 tableName = null; 150 // In the implementations of some callable with replicas, rpc calls are executed in a executor 151 // and we could cancel the operation from outside which means there could be a race between 152 // reset and startCancel. Although I think the race should be handled by the callable since the 153 // reset may clear the cancel state... 154 synchronized (this) { 155 done = false; 156 cancelled = false; 157 cancellationCbs.clear(); 158 } 159 } 160 161 @Override 162 public int getCallTimeout() { 163 return callTimeout != null ? callTimeout : 0; 164 } 165 166 @Override 167 public void setCallTimeout(int callTimeout) { 168 this.callTimeout = callTimeout; 169 } 170 171 @Override 172 public boolean hasCallTimeout() { 173 return callTimeout != null; 174 } 175 176 @Override 177 public Map<String, byte[]> getRequestAttributes() { 178 return requestAttributes; 179 } 180 181 @Override 182 public void setRequestAttributes(Map<String, byte[]> requestAttributes) { 183 this.requestAttributes = requestAttributes; 184 } 185 186 @Override 187 public synchronized String errorText() { 188 if (!done || exception == null) { 189 return null; 190 } 191 return exception.getMessage(); 192 } 193 194 @Override 195 public synchronized boolean failed() { 196 return done && this.exception != null; 197 } 198 199 @Override 200 public synchronized boolean isCanceled() { 201 return cancelled; 202 } 203 204 @Override 205 public void notifyOnCancel(RpcCallback<Object> callback) { 206 synchronized (this) { 207 if (done) { 208 return; 209 } 210 if (!cancelled) { 211 cancellationCbs.add(callback); 212 return; 213 } 214 } 215 // run it directly as we have already been cancelled. 216 callback.run(null); 217 } 218 219 @Override 220 public synchronized void setFailed(String reason) { 221 if (done) { 222 return; 223 } 224 done = true; 225 exception = new IOException(reason); 226 } 227 228 @Override 229 public synchronized void setFailed(IOException e) { 230 if (done) { 231 return; 232 } 233 done = true; 234 exception = e; 235 } 236 237 @Override 238 public synchronized IOException getFailed() { 239 return done ? exception : null; 240 } 241 242 @Override 243 public synchronized void setDone(CellScanner cellScanner) { 244 if (done) { 245 return; 246 } 247 done = true; 248 this.cellScanner = cellScanner; 249 } 250 251 @Override 252 public void startCancel() { 253 // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need 254 // to copy it. 255 List<RpcCallback<Object>> cbs; 256 synchronized (this) { 257 if (done) { 258 return; 259 } 260 done = true; 261 cancelled = true; 262 cbs = new ArrayList<>(cancellationCbs); 263 } 264 for (RpcCallback<?> cb : cbs) { 265 cb.run(null); 266 } 267 } 268 269 @Override 270 public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) 271 throws IOException { 272 if (cancelled) { 273 action.run(true); 274 } else { 275 cancellationCbs.add(callback); 276 action.run(false); 277 } 278 } 279 280 @Override 281 public String toString() { 282 return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done 283 + ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception=" 284 + exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner=" 285 + cellScanner + '}'; 286 } 287 288 @Override 289 public void setTableName(TableName tableName) { 290 this.tableName = tableName; 291 } 292 293 @Override 294 public TableName getTableName() { 295 return tableName; 296 } 297}