001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.math.RoundingMode; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.Optional; 027import java.util.TreeMap; 028import java.util.stream.IntStream; 029import org.apache.commons.lang3.mutable.MutableLong; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilderFactory; 032import org.apache.hadoop.hbase.CellBuilderType; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Increment; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.regionserver.RegionScanner; 047import org.apache.hadoop.hbase.regionserver.ScanOptions; 048import org.apache.hadoop.hbase.regionserver.ScanType; 049import org.apache.hadoop.hbase.regionserver.ScannerContext; 050import org.apache.hadoop.hbase.regionserver.Store; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.yetus.audience.InterfaceAudience; 056 057import org.apache.hbase.thirdparty.com.google.common.math.IntMath; 058 059/** 060 * An example for implementing a counter that reads is much less than writes, i.e, write heavy. 061 * <p> 062 * We will convert increment to put, and do aggregating when get. And of course the return value of 063 * increment is useless then. 064 * <p> 065 * Notice that this is only an example so we do not handle most corner cases, for example, you must 066 * provide a qualifier when doing a get. 067 */ 068@InterfaceAudience.Private 069public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver { 070 071 @Override 072 public Optional<RegionObserver> getRegionObserver() { 073 return Optional.of(this); 074 } 075 076 @Override 077 public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 078 ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 079 options.readAllVersions(); 080 } 081 082 private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) { 083 return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 084 .setType(Cell.Type.Put).setFamily(family).setQualifier(qualifier).setTimestamp(ts) 085 .setValue(Bytes.toBytes(value)).build(); 086 } 087 088 private InternalScanner wrap(byte[] family, InternalScanner scanner) { 089 return new InternalScanner() { 090 091 private List<Cell> srcResult = new ArrayList<>(); 092 093 private byte[] row; 094 095 private byte[] qualifier; 096 097 private long timestamp; 098 099 private long sum; 100 101 @Override 102 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 103 boolean moreRows = scanner.next(srcResult, scannerContext); 104 if (srcResult.isEmpty()) { 105 if (!moreRows && row != null) { 106 result.add(createCell(row, family, qualifier, timestamp, sum)); 107 } 108 return moreRows; 109 } 110 Cell firstCell = srcResult.get(0); 111 // Check if there is a row change first. All the cells will come from the same row so just 112 // check the first one once is enough. 113 if (row == null) { 114 row = CellUtil.cloneRow(firstCell); 115 qualifier = CellUtil.cloneQualifier(firstCell); 116 } else if (!CellUtil.matchingRows(firstCell, row)) { 117 result.add(createCell(row, family, qualifier, timestamp, sum)); 118 row = CellUtil.cloneRow(firstCell); 119 qualifier = CellUtil.cloneQualifier(firstCell); 120 sum = 0; 121 } 122 srcResult.forEach(c -> { 123 if (CellUtil.matchingQualifier(c, qualifier)) { 124 sum += Bytes.toLong(c.getValueArray(), c.getValueOffset()); 125 } else { 126 result.add(createCell(row, family, qualifier, timestamp, sum)); 127 qualifier = CellUtil.cloneQualifier(c); 128 sum = Bytes.toLong(c.getValueArray(), c.getValueOffset()); 129 } 130 timestamp = c.getTimestamp(); 131 }); 132 if (!moreRows) { 133 result.add(createCell(row, family, qualifier, timestamp, sum)); 134 } 135 srcResult.clear(); 136 return moreRows; 137 } 138 139 @Override 140 public void close() throws IOException { 141 scanner.close(); 142 } 143 }; 144 } 145 146 @Override 147 public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 148 InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 149 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 150 } 151 152 @Override 153 public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 154 ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 155 CompactionRequest request) throws IOException { 156 options.readAllVersions(); 157 } 158 159 @Override 160 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 161 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 162 CompactionRequest request) throws IOException { 163 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 164 } 165 166 @Override 167 public void preMemStoreCompactionCompactScannerOpen( 168 ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options) 169 throws IOException { 170 options.readAllVersions(); 171 } 172 173 @Override 174 public InternalScanner preMemStoreCompactionCompact( 175 ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) 176 throws IOException { 177 return wrap(store.getColumnFamilyDescriptor().getName(), scanner); 178 } 179 180 @Override 181 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) 182 throws IOException { 183 Scan scan = 184 new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions(); 185 NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums = 186 new TreeMap<>(Bytes.BYTES_COMPARATOR); 187 get.getFamilyMap().forEach((cf, cqs) -> { 188 NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR); 189 sums.put(cf, ss); 190 cqs.forEach(cq -> { 191 ss.put(cq, new MutableLong(0)); 192 scan.addColumn(cf, cq); 193 }); 194 }); 195 List<Cell> cells = new ArrayList<>(); 196 try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { 197 boolean moreRows; 198 do { 199 moreRows = scanner.next(cells); 200 for (Cell cell : cells) { 201 byte[] family = CellUtil.cloneFamily(cell); 202 byte[] qualifier = CellUtil.cloneQualifier(cell); 203 long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset()); 204 sums.get(family).get(qualifier).add(value); 205 } 206 cells.clear(); 207 } while (moreRows); 208 } 209 sums.forEach((cf, m) -> m.forEach((cq, s) -> result 210 .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue())))); 211 c.bypass(); 212 } 213 214 private final int mask; 215 private final MutableLong[] lastTimestamps; 216 { 217 int stripes = 218 1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING); 219 lastTimestamps = 220 IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new); 221 mask = stripes - 1; 222 } 223 224 // We need make sure the different put uses different timestamp otherwise we may lost some 225 // increments. This is a known issue for HBase. 226 private long getUniqueTimestamp(byte[] row) { 227 int slot = Bytes.hashCode(row) & mask; 228 MutableLong lastTimestamp = lastTimestamps[slot]; 229 long now = EnvironmentEdgeManager.currentTime(); 230 synchronized (lastTimestamp) { 231 long pt = lastTimestamp.longValue() >> 10; 232 if (now > pt) { 233 lastTimestamp.setValue(now << 10); 234 } else { 235 lastTimestamp.increment(); 236 } 237 return lastTimestamp.longValue(); 238 } 239 } 240 241 @Override 242 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 243 throws IOException { 244 byte[] row = increment.getRow(); 245 Put put = new Put(row); 246 long ts = getUniqueTimestamp(row); 247 for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) { 248 for (Cell cell : entry.getValue()) { 249 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 250 .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) 251 .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), 252 cell.getQualifierLength()) 253 .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) 254 .setType(Cell.Type.Put).setTimestamp(ts).build()); 255 } 256 } 257 c.getEnvironment().getRegion().put(put); 258 c.bypass(); 259 return Result.EMPTY_RESULT; 260 } 261 262 @Override 263 public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store, 264 ScanOptions options) throws IOException { 265 options.readAllVersions(); 266 } 267}