001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import org.apache.commons.io.IOUtils; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.CoprocessorEnvironment; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 032import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 034import org.apache.hadoop.hbase.regionserver.InternalScanner; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.yetus.audience.InterfaceAudience; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 039import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 041import org.apache.hbase.thirdparty.com.google.protobuf.Service; 042 043import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.CountRequest; 044import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.CountResponse; 045import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.RowCountService; 046 047/** 048 * Sample coprocessor endpoint exposing a Service interface for counting rows and key values. 049 * <p> 050 * For the protocol buffer definition of the RowCountService, see the source file located under 051 * hbase-examples/src/main/protobuf/Examples.proto. 052 * </p> 053 */ 054@InterfaceAudience.Private 055public class RowCountEndpoint extends RowCountService implements RegionCoprocessor { 056 private RegionCoprocessorEnvironment env; 057 058 public RowCountEndpoint() { 059 } 060 061 /** 062 * Just returns a reference to this object, which implements the RowCounterService interface. 063 */ 064 @Override 065 public Iterable<Service> getServices() { 066 return Collections.singleton(this); 067 } 068 069 /** 070 * Returns a count of the rows in the region where this coprocessor is loaded. 071 */ 072 @Override 073 public void getRowCount(RpcController controller, CountRequest request, 074 RpcCallback<CountResponse> done) { 075 Scan scan = new Scan(); 076 scan.setFilter(new FirstKeyOnlyFilter()); 077 CountResponse response = null; 078 InternalScanner scanner = null; 079 try { 080 scanner = env.getRegion().getScanner(scan); 081 List<Cell> results = new ArrayList<>(); 082 boolean hasMore = false; 083 byte[] lastRow = null; 084 long count = 0; 085 do { 086 hasMore = scanner.next(results); 087 for (Cell kv : results) { 088 byte[] currentRow = CellUtil.cloneRow(kv); 089 if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { 090 lastRow = currentRow; 091 count++; 092 } 093 } 094 results.clear(); 095 } while (hasMore); 096 097 response = CountResponse.newBuilder().setCount(count).build(); 098 } catch (IOException ioe) { 099 CoprocessorRpcUtils.setControllerException(controller, ioe); 100 } finally { 101 if (scanner != null) { 102 IOUtils.closeQuietly(scanner); 103 } 104 } 105 done.run(response); 106 } 107 108 /** 109 * Returns a count of all KeyValues in the region where this coprocessor is loaded. 110 */ 111 @Override 112 public void getKeyValueCount(RpcController controller, CountRequest request, 113 RpcCallback<CountResponse> done) { 114 CountResponse response = null; 115 InternalScanner scanner = null; 116 try { 117 scanner = env.getRegion().getScanner(new Scan()); 118 List<Cell> results = new ArrayList<>(); 119 boolean hasMore = false; 120 long count = 0; 121 do { 122 hasMore = scanner.next(results); 123 count += Iterables.size(results); 124 results.clear(); 125 } while (hasMore); 126 response = CountResponse.newBuilder().setCount(count).build(); 127 } catch (IOException ioe) { 128 CoprocessorRpcUtils.setControllerException(controller, ioe); 129 } finally { 130 if (scanner != null) { 131 IOUtils.closeQuietly(scanner); 132 } 133 } 134 done.run(response); 135 } 136 137 /** 138 * Stores a reference to the coprocessor environment provided by the 139 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 140 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on 141 * a table region, so always expects this to be an instance of 142 * {@link RegionCoprocessorEnvironment}. 143 * @param env the environment provided by the coprocessor host 144 * @throws IOException if the provided environment is not an instance of 145 * {@code RegionCoprocessorEnvironment} 146 */ 147 @Override 148 public void start(CoprocessorEnvironment env) throws IOException { 149 if (env instanceof RegionCoprocessorEnvironment) { 150 this.env = (RegionCoprocessorEnvironment) env; 151 } else { 152 throw new CoprocessorException("Must be loaded on a table region!"); 153 } 154 } 155 156 @Override 157 public void stop(CoprocessorEnvironment env) throws IOException { 158 // nothing to do 159 } 160}