001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.hbase.ExtendedCellScannable;
026import org.apache.hadoop.hbase.ExtendedCellScanner;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
034
035/**
036 * Get instances via {@link RpcControllerFactory} on client-side.
037 * @see RpcControllerFactory
038 */
039@InterfaceAudience.Private
040public class HBaseRpcControllerImpl implements HBaseRpcController {
041  /**
042   * The time, in ms before the call should expire.
043   */
044  private Integer callTimeout;
045
046  private boolean done = false;
047
048  private boolean cancelled = false;
049
050  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
051
052  private IOException exception;
053
054  private TableName tableName;
055
056  /**
057   * Rpc target Region's RegionInfo we are going against. May be null.
058   * @see #hasRegionInfo()
059   */
060  private RegionInfo regionInfo;
061
062  /**
063   * Priority to set on this request. Set it here in controller so available composing the request.
064   * This is the ordained way of setting priorities going forward. We will be undoing the old
065   * annotation-based mechanism.
066   */
067  private int priority = HConstants.PRIORITY_UNSET;
068
069  /**
070   * They are optionally set on construction, cleared after we make the call, and then optionally
071   * set on response with the result. We use this lowest common denominator access to Cells because
072   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
073   * block that implements CellScanner.
074   */
075  private ExtendedCellScanner cellScanner;
076
077  private Map<String, byte[]> requestAttributes = Collections.emptyMap();
078
079  public HBaseRpcControllerImpl() {
080    this(null, (ExtendedCellScanner) null);
081  }
082
083  /**
084   * Used server-side. Clients should go via {@link RpcControllerFactory}
085   */
086  public HBaseRpcControllerImpl(final ExtendedCellScanner cellScanner) {
087    this(null, cellScanner);
088  }
089
090  HBaseRpcControllerImpl(RegionInfo regionInfo, final ExtendedCellScanner cellScanner) {
091    this.cellScanner = cellScanner;
092    this.regionInfo = regionInfo;
093  }
094
095  HBaseRpcControllerImpl(RegionInfo regionInfo, final List<ExtendedCellScannable> cellIterables) {
096    this.cellScanner =
097      cellIterables == null ? null : PrivateCellUtil.createExtendedCellScanner(cellIterables);
098    this.regionInfo = null;
099  }
100
101  @Override
102  public boolean hasRegionInfo() {
103    return this.regionInfo != null;
104  }
105
106  @Override
107  public RegionInfo getRegionInfo() {
108    return this.regionInfo;
109  }
110
111  /** Returns One-shot cell scanner (you cannot back it up and restart) */
112  @Override
113  public ExtendedCellScanner cellScanner() {
114    return cellScanner;
115  }
116
117  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
118      justification = "The only possible race method is startCancel")
119  @Override
120  public void setCellScanner(final ExtendedCellScanner cellScanner) {
121    this.cellScanner = cellScanner;
122  }
123
124  @Override
125  public void setPriority(int priority) {
126    this.priority = Math.max(this.priority, priority);
127
128  }
129
130  @Override
131  public void setPriority(final TableName tn) {
132    setPriority(
133      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
134  }
135
136  @Override
137  public int getPriority() {
138    return priority < 0 ? HConstants.NORMAL_QOS : priority;
139  }
140
141  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
142      justification = "The only possible race method is startCancel")
143  @Override
144  public void reset() {
145    priority = 0;
146    cellScanner = null;
147    exception = null;
148    callTimeout = null;
149    regionInfo = null;
150    tableName = null;
151    // In the implementations of some callable with replicas, rpc calls are executed in a executor
152    // and we could cancel the operation from outside which means there could be a race between
153    // reset and startCancel. Although I think the race should be handled by the callable since the
154    // reset may clear the cancel state...
155    synchronized (this) {
156      done = false;
157      cancelled = false;
158      cancellationCbs.clear();
159    }
160  }
161
162  @Override
163  public int getCallTimeout() {
164    return callTimeout != null ? callTimeout : 0;
165  }
166
167  @Override
168  public void setCallTimeout(int callTimeout) {
169    this.callTimeout = callTimeout;
170  }
171
172  @Override
173  public boolean hasCallTimeout() {
174    return callTimeout != null;
175  }
176
177  @Override
178  public Map<String, byte[]> getRequestAttributes() {
179    return requestAttributes;
180  }
181
182  @Override
183  public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
184    this.requestAttributes = requestAttributes;
185  }
186
187  @Override
188  public synchronized String errorText() {
189    if (!done || exception == null) {
190      return null;
191    }
192    return exception.getMessage();
193  }
194
195  @Override
196  public synchronized boolean failed() {
197    return done && this.exception != null;
198  }
199
200  @Override
201  public synchronized boolean isCanceled() {
202    return cancelled;
203  }
204
205  @Override
206  public void notifyOnCancel(RpcCallback<Object> callback) {
207    synchronized (this) {
208      if (done) {
209        return;
210      }
211      if (!cancelled) {
212        cancellationCbs.add(callback);
213        return;
214      }
215    }
216    // run it directly as we have already been cancelled.
217    callback.run(null);
218  }
219
220  @Override
221  public synchronized void setFailed(String reason) {
222    if (done) {
223      return;
224    }
225    done = true;
226    exception = new IOException(reason);
227  }
228
229  @Override
230  public synchronized void setFailed(IOException e) {
231    if (done) {
232      return;
233    }
234    done = true;
235    exception = e;
236  }
237
238  @Override
239  public synchronized IOException getFailed() {
240    return done ? exception : null;
241  }
242
243  @Override
244  public synchronized void setDone(ExtendedCellScanner cellScanner) {
245    if (done) {
246      return;
247    }
248    done = true;
249    this.cellScanner = cellScanner;
250  }
251
252  @Override
253  public void startCancel() {
254    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
255    // to copy it.
256    List<RpcCallback<Object>> cbs;
257    synchronized (this) {
258      if (done) {
259        return;
260      }
261      done = true;
262      cancelled = true;
263      cbs = new ArrayList<>(cancellationCbs);
264    }
265    for (RpcCallback<?> cb : cbs) {
266      cb.run(null);
267    }
268  }
269
270  @Override
271  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
272    throws IOException {
273    if (cancelled) {
274      action.run(true);
275    } else {
276      cancellationCbs.add(callback);
277      action.run(false);
278    }
279  }
280
281  @Override
282  public void setTableName(TableName tableName) {
283    this.tableName = tableName;
284  }
285
286  @Override
287  public TableName getTableName() {
288    return tableName;
289  }
290}