001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import com.google.protobuf.Descriptors.MethodDescriptor; 023import com.google.protobuf.Message; 024import com.google.protobuf.RpcCallback; 025import com.google.protobuf.RpcChannel; 026import com.google.protobuf.RpcController; 027import io.opentelemetry.context.Context; 028import io.opentelemetry.context.Scope; 029import java.io.IOException; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 036import org.apache.hadoop.hbase.ipc.HBaseRpcController; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 042 043/** 044 * The implementation of a region based coprocessor rpc channel. 045 */ 046@InterfaceAudience.Private 047class RegionCoprocessorRpcChannelImpl implements RpcChannel { 048 049 private final AsyncConnectionImpl conn; 050 051 private final TableName tableName; 052 053 private final RegionInfo region; 054 055 private final byte[] row; 056 057 private final long rpcTimeoutNs; 058 059 private final long operationTimeoutNs; 060 061 RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region, 062 byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { 063 this.conn = conn; 064 this.tableName = tableName; 065 this.region = region; 066 this.row = row; 067 this.rpcTimeoutNs = rpcTimeoutNs; 068 this.operationTimeoutNs = operationTimeoutNs; 069 } 070 071 private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request, 072 Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, 073 ClientService.Interface stub) { 074 final Context context = Context.current(); 075 CompletableFuture<Message> future = new CompletableFuture<>(); 076 if ( 077 region != null && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName()) 078 ) { 079 future.completeExceptionally(new DoNotRetryIOException( 080 "Region name is changed, expected " + region.getRegionNameAsString() + ", actual " 081 + loc.getRegionInfo().getRegionNameAsString())); 082 return future; 083 } 084 CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, 085 request, row, loc.getRegionInfo().getRegionName()); 086 stub.execService(controller, csr, resp -> { 087 try (Scope ignored = context.makeCurrent()) { 088 if (controller.failed()) { 089 future.completeExceptionally(controller.getFailed()); 090 } else { 091 try { 092 future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); 093 } catch (IOException e) { 094 future.completeExceptionally(e); 095 } 096 } 097 } 098 }); 099 return future; 100 } 101 102 @Override 103 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 104 Message responsePrototype, RpcCallback<Message> done) { 105 final Context context = Context.current(); 106 addListener(conn.callerFactory.<Message> single().table(tableName).row(row) 107 .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 108 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS).action((c, l, s) -> { 109 try (Scope ignored = context.makeCurrent()) { 110 return rpcCall(method, request, responsePrototype, c, l, s); 111 } 112 }).call(), (r, e) -> { 113 try (Scope ignored = context.makeCurrent()) { 114 if (e != null) { 115 ((ClientCoprocessorRpcController) controller).setFailed(e); 116 } 117 done.run(r); 118 } 119 }); 120 } 121}