001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import com.google.protobuf.ByteString; 021import com.google.protobuf.Message; 022import com.google.protobuf.RpcCallback; 023import com.google.protobuf.RpcController; 024import com.google.protobuf.Service; 025import java.io.IOException; 026import java.lang.reflect.InvocationTargetException; 027import java.lang.reflect.Method; 028import java.util.Collections; 029import org.apache.hadoop.hbase.CoprocessorEnvironment; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 033import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest; 034import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse; 035import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; 036import org.apache.hadoop.hbase.regionserver.Region; 037import org.apache.hadoop.hbase.regionserver.RowProcessor; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040 041/** 042 * This class demonstrates how to implement atomic read-modify-writes using 043 * {@link Region#processRowsWithLocks} and Coprocessor endpoints. 044 */ 045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 046@InterfaceStability.Evolving 047public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message> 048 extends RowProcessorService implements RegionCoprocessor { 049 private RegionCoprocessorEnvironment env; 050 051 /** 052 * Pass a processor to region to process multiple rows atomically. The RowProcessor 053 * implementations should be the inner classes of your RowProcessorEndpoint. This way the 054 * RowProcessor can be class-loaded with the Coprocessor endpoint together. See 055 * {@code TestRowProcessorEndpoint} for example. The request contains information for constructing 056 * processor (see {@link #constructRowProcessorFromRequest}. The processor object defines the 057 * read-modify-write procedure. 058 */ 059 @Override 060 public void process(RpcController controller, ProcessRequest request, 061 RpcCallback<ProcessResponse> done) { 062 ProcessResponse resultProto = null; 063 try { 064 RowProcessor<S, T> processor = constructRowProcessorFromRequest(request); 065 Region region = env.getRegion(); 066 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 067 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; 068 region.processRowsWithLocks(processor, nonceGroup, nonce); 069 T result = processor.getResult(); 070 ProcessResponse.Builder b = ProcessResponse.newBuilder(); 071 b.setRowProcessorResult(result.toByteString()); 072 resultProto = b.build(); 073 } catch (Exception e) { 074 CoprocessorRpcUtils.setControllerException(controller, new IOException(e)); 075 } 076 done.run(resultProto); 077 } 078 079 @Override 080 public Iterable<Service> getServices() { 081 return Collections.singleton(this); 082 } 083 084 /** 085 * Stores a reference to the coprocessor environment provided by the 086 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 087 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on 088 * a table region, so always expects this to be an instance of 089 * {@link RegionCoprocessorEnvironment}. 090 * @param env the environment provided by the coprocessor host 091 * @throws IOException if the provided environment is not an instance of 092 * {@code RegionCoprocessorEnvironment} 093 */ 094 @Override 095 public void start(CoprocessorEnvironment env) throws IOException { 096 if (env instanceof RegionCoprocessorEnvironment) { 097 this.env = (RegionCoprocessorEnvironment) env; 098 } else { 099 throw new CoprocessorException("Must be loaded on a table region!"); 100 } 101 } 102 103 @Override 104 public void stop(CoprocessorEnvironment env) throws IOException { 105 // nothing to do 106 } 107 108 @SuppressWarnings("unchecked") 109 RowProcessor<S, T> constructRowProcessorFromRequest(ProcessRequest request) throws IOException { 110 String className = request.getRowProcessorClassName(); 111 Class<?> cls; 112 try { 113 cls = Class.forName(className); 114 RowProcessor<S, T> ci = (RowProcessor<S, T>) cls.getDeclaredConstructor().newInstance(); 115 if (request.hasRowProcessorInitializerMessageName()) { 116 Class<?> imn = 117 Class.forName(request.getRowProcessorInitializerMessageName()).asSubclass(Message.class); 118 Method m; 119 try { 120 m = imn.getMethod("parseFrom", ByteString.class); 121 } catch (SecurityException e) { 122 throw new IOException(e); 123 } catch (NoSuchMethodException e) { 124 throw new IOException(e); 125 } 126 S s; 127 try { 128 s = (S) m.invoke(null, request.getRowProcessorInitializerMessage()); 129 } catch (IllegalArgumentException e) { 130 throw new IOException(e); 131 } catch (InvocationTargetException e) { 132 throw new IOException(e); 133 } 134 ci.initialize(s); 135 } 136 return ci; 137 } catch (Exception e) { 138 throw new IOException(e); 139 } 140 } 141}