001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.Map;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.RegionLocations;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * A reversed ScannerCallable which supports backward scanning.
041 */
042@InterfaceAudience.Private
043public class ReversedScannerCallable extends ScannerCallable {
044
045  private byte[] locationSearchKey;
046
047  /**
048   * @param connection  which connection
049   * @param tableName   table callable is on
050   * @param scan        the scan to execute
051   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
052   *                    metrics
053   * @param rpcFactory  to create an {@link com.google.protobuf.RpcController} to talk to the
054   *                    regionserver
055   * @param replicaId   the replica id
056   */
057  public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
058    ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId,
059    Map<String, byte[]> requestAttributes) {
060    super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId, requestAttributes);
061  }
062
063  @Override
064  public void throwable(Throwable t, boolean retrying) {
065    // for reverse scans, we need to update cache using the search key found for the reverse scan
066    // range in prepare. Otherwise, we will see weird behavior at the table boundaries,
067    // when trying to clear cache for an empty row.
068    if (location != null && locationSearchKey != null) {
069      getConnection().updateCachedLocations(getTableName(),
070        location.getRegionInfo().getRegionName(), locationSearchKey, t, location.getServerName());
071    }
072  }
073
074  /**
075   * @param reload force reload of server location
076   */
077  @Override
078  public void prepare(boolean reload) throws IOException {
079    if (Thread.interrupted()) {
080      throw new InterruptedIOException();
081    }
082
083    if (
084      reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
085        && getConnection().isTableDisabled(getTableName())
086    ) {
087      throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
088    }
089
090    if (!instantiated || reload) {
091      // we should use range locate if
092      // 1. we do not want the start row
093      // 2. the start row is empty which means we need to locate to the last region.
094      if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
095        // Just locate the region with the row
096        RegionLocations rl = getRegionLocationsForPrepare(getRow());
097        this.location = getLocationForReplica(rl);
098        this.locationSearchKey = getRow();
099      } else {
100        // The locateStart row is an approximation. So we need to search between
101        // that and the actual row in order to really find the last region
102        byte[] locateStartRow = createCloseRowBefore(getRow());
103        Pair<HRegionLocation, byte[]> lastRegionAndKey =
104          locateLastRegionInRange(locateStartRow, getRow());
105        this.location = lastRegionAndKey.getFirst();
106        this.locationSearchKey = lastRegionAndKey.getSecond();
107      }
108
109      if (location == null || location.getServerName() == null) {
110        throw new IOException("Failed to find location, tableName=" + getTableName() + ", row="
111          + Bytes.toStringBinary(getRow()) + ", reload=" + reload);
112      }
113
114      setStub(getConnection().getClient(getLocation().getServerName()));
115      checkIfRegionServerIsRemote();
116      instantiated = true;
117    }
118
119    // check how often we retry.
120    if (reload) {
121      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
122    }
123  }
124
125  /**
126   * Get the last region before the endkey, which will be used to execute the reverse scan
127   * @param startKey Starting row in range, inclusive
128   * @param endKey   Ending row in range, exclusive
129   * @return The last location, and the rowKey used to find it. May be null, if a region could not
130   *         be found.
131   */
132  private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
133    throws IOException {
134    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
135    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
136      throw new IllegalArgumentException(
137        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
138    }
139
140    HRegionLocation lastRegion = null;
141    byte[] lastFoundKey = null;
142    byte[] currentKey = startKey;
143
144    do {
145      RegionLocations rl = getRegionLocationsForPrepare(currentKey);
146      HRegionLocation regionLocation = getLocationForReplica(rl);
147      if (regionLocation.getRegionInfo().containsRow(currentKey)) {
148        lastFoundKey = currentKey;
149        lastRegion = regionLocation;
150      } else {
151        throw new DoNotRetryIOException(
152          "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey)
153            + " returns incorrect region " + regionLocation.getRegionInfo());
154      }
155      currentKey = regionLocation.getRegionInfo().getEndKey();
156    } while (
157      !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
158        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)
159    );
160
161    return new Pair<>(lastRegion, lastFoundKey);
162  }
163
164  @Override
165  public ScannerCallable getScannerCallableForReplica(int id) {
166    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
167      this.getScan(), this.scanMetrics, rpcControllerFactory, id, requestAttributes);
168    r.setCaching(this.getCaching());
169    return r;
170  }
171}