001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.Map; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.RegionLocations; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.TableNotEnabledException; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * A reversed ScannerCallable which supports backward scanning. 041 */ 042@InterfaceAudience.Private 043public class ReversedScannerCallable extends ScannerCallable { 044 045 private byte[] locationSearchKey; 046 047 /** 048 * @param connection which connection 049 * @param tableName table callable is on 050 * @param scan the scan to execute 051 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect 052 * metrics 053 * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the 054 * regionserver 055 * @param replicaId the replica id 056 */ 057 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 058 ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId, 059 Map<String, byte[]> requestAttributes) { 060 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId, requestAttributes); 061 } 062 063 @Override 064 public void throwable(Throwable t, boolean retrying) { 065 // for reverse scans, we need to update cache using the search key found for the reverse scan 066 // range in prepare. Otherwise, we will see weird behavior at the table boundaries, 067 // when trying to clear cache for an empty row. 068 if (location != null && locationSearchKey != null) { 069 getConnection().updateCachedLocations(getTableName(), 070 location.getRegionInfo().getRegionName(), locationSearchKey, t, location.getServerName()); 071 } 072 } 073 074 /** 075 * @param reload force reload of server location 076 */ 077 @Override 078 public void prepare(boolean reload) throws IOException { 079 if (Thread.interrupted()) { 080 throw new InterruptedIOException(); 081 } 082 083 if ( 084 reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) 085 && getConnection().isTableDisabled(getTableName()) 086 ) { 087 throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); 088 } 089 090 if (!instantiated || reload) { 091 // we should use range locate if 092 // 1. we do not want the start row 093 // 2. the start row is empty which means we need to locate to the last region. 094 if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { 095 // Just locate the region with the row 096 RegionLocations rl = getRegionLocationsForPrepare(getRow()); 097 this.location = getLocationForReplica(rl); 098 this.locationSearchKey = getRow(); 099 } else { 100 // The locateStart row is an approximation. So we need to search between 101 // that and the actual row in order to really find the last region 102 byte[] locateStartRow = createCloseRowBefore(getRow()); 103 Pair<HRegionLocation, byte[]> lastRegionAndKey = 104 locateLastRegionInRange(locateStartRow, getRow()); 105 this.location = lastRegionAndKey.getFirst(); 106 this.locationSearchKey = lastRegionAndKey.getSecond(); 107 } 108 109 if (location == null || location.getServerName() == null) { 110 throw new IOException("Failed to find location, tableName=" + getTableName() + ", row=" 111 + Bytes.toStringBinary(getRow()) + ", reload=" + reload); 112 } 113 114 setStub(getConnection().getClient(getLocation().getServerName())); 115 checkIfRegionServerIsRemote(); 116 instantiated = true; 117 } 118 119 // check how often we retry. 120 if (reload) { 121 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 122 } 123 } 124 125 /** 126 * Get the last region before the endkey, which will be used to execute the reverse scan 127 * @param startKey Starting row in range, inclusive 128 * @param endKey Ending row in range, exclusive 129 * @return The last location, and the rowKey used to find it. May be null, if a region could not 130 * be found. 131 */ 132 private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey) 133 throws IOException { 134 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); 135 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 136 throw new IllegalArgumentException( 137 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); 138 } 139 140 HRegionLocation lastRegion = null; 141 byte[] lastFoundKey = null; 142 byte[] currentKey = startKey; 143 144 do { 145 RegionLocations rl = getRegionLocationsForPrepare(currentKey); 146 HRegionLocation regionLocation = getLocationForReplica(rl); 147 if (regionLocation.getRegionInfo().containsRow(currentKey)) { 148 lastFoundKey = currentKey; 149 lastRegion = regionLocation; 150 } else { 151 throw new DoNotRetryIOException( 152 "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) 153 + " returns incorrect region " + regionLocation.getRegionInfo()); 154 } 155 currentKey = regionLocation.getRegionInfo().getEndKey(); 156 } while ( 157 !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 158 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0) 159 ); 160 161 return new Pair<>(lastRegion, lastFoundKey); 162 } 163 164 @Override 165 public ScannerCallable getScannerCallableForReplica(int id) { 166 ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), 167 this.getScan(), this.scanMetrics, rpcControllerFactory, id, requestAttributes); 168 r.setCaching(this.getCaching()); 169 return r; 170 } 171}