001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.Map;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
030
031/**
032 * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use AsyncProcess
033 * directly though this class. Also adds global timeout tracking on top of RegionServerCallable and
034 * implements Cancellable. Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's
035 * callWithRetries. So you can only use this callable in AsyncProcess which only uses
036 * callWithoutRetries and retries in its own implementation.
037 */
038@InterfaceAudience.Private
039abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T>
040  implements Cancellable {
041  private final RetryingTimeTracker tracker;
042  private final int rpcTimeout;
043
044  CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
045    RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority,
046    Map<String, byte[]> requestAttributes) {
047    super(connection, tableName, row, rpcController, priority, requestAttributes);
048    this.rpcTimeout = rpcTimeout;
049    this.tracker = tracker;
050  }
051
052  /*
053   * Override so can mess with the callTimeout. (non-Javadoc)
054   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
055   */
056  @Override
057  public T call(int operationTimeout) throws IOException {
058    if (isCancelled()) return null;
059    if (Thread.interrupted()) {
060      throw new InterruptedIOException();
061    }
062    // It is expected (it seems) that tracker.start can be called multiple times (on each trip
063    // through the call when retrying). Also, we can call start and no need of a stop.
064    this.tracker.start();
065    int remainingTime = tracker.getRemainingTime(operationTimeout);
066    if (remainingTime <= 1) {
067      // "1" is a special return value in RetryingTimeTracker, see its implementation.
068      throw new OperationTimeoutExceededException(
069        "Timeout exceeded before call began. Meta requests may be slow, the operation "
070          + "timeout is too short for the number of requests, or the configured retries "
071          + "can't complete in the operation timeout.");
072    }
073    return super.call(Math.min(rpcTimeout, remainingTime));
074  }
075
076  @Override
077  public void prepare(boolean reload) throws IOException {
078    if (isCancelled()) return;
079    if (Thread.interrupted()) {
080      throw new InterruptedIOException();
081    }
082    super.prepare(reload);
083  }
084
085  @Override
086  protected void setStubByServiceName(ServerName serviceName) throws IOException {
087    setStub(getConnection().getClient(serviceName));
088  }
089
090  @Override
091  public void cancel() {
092    getRpcController().startCancel();
093  }
094
095  @Override
096  public boolean isCancelled() {
097    return getRpcController().isCanceled();
098  }
099
100  protected ClientProtos.MultiResponse doMulti(ClientProtos.MultiRequest request)
101    throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
102    return getStub().multi(getRpcController(), request);
103  }
104
105  protected ClientProtos.ScanResponse doScan(ClientProtos.ScanRequest request)
106    throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
107    return getStub().scan(getRpcController(), request);
108  }
109
110  protected ClientProtos.PrepareBulkLoadResponse
111    doPrepareBulkLoad(ClientProtos.PrepareBulkLoadRequest request)
112      throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
113    return getStub().prepareBulkLoad(getRpcController(), request);
114  }
115
116  protected ClientProtos.BulkLoadHFileResponse
117    doBulkLoadHFile(ClientProtos.BulkLoadHFileRequest request)
118      throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
119    return getStub().bulkLoadHFile(getRpcController(), request);
120  }
121
122  protected ClientProtos.CleanupBulkLoadResponse
123    doCleanupBulkLoad(ClientProtos.CleanupBulkLoadRequest request)
124      throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
125    return getStub().cleanupBulkLoad(getRpcController(), request);
126  }
127
128  RetryingTimeTracker getTracker() {
129    return tracker;
130  }
131}