001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example.row.stats; 019 020import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.CF; 021import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACE; 022import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME; 023import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.TABLE_RECORDER_KEY; 024 025import java.io.IOException; 026import java.time.Duration; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.CoprocessorEnvironment; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.NamespaceDescriptor; 038import org.apache.hadoop.hbase.RawCellBuilder; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.MasterObserver; 048import org.apache.hadoop.hbase.coprocessor.ObserverContext; 049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 051import org.apache.hadoop.hbase.coprocessor.RegionObserver; 052import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder; 053import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder; 054import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil; 055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 056import org.apache.hadoop.hbase.metrics.Counter; 057import org.apache.hadoop.hbase.regionserver.InternalScanner; 058import org.apache.hadoop.hbase.regionserver.ScanType; 059import org.apache.hadoop.hbase.regionserver.ScannerContext; 060import org.apache.hadoop.hbase.regionserver.Shipper; 061import org.apache.hadoop.hbase.regionserver.Store; 062import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 063import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@InterfaceAudience.Private 069public class RowStatisticsCompactionObserver 070 implements RegionCoprocessor, RegionObserver, MasterCoprocessor, MasterObserver { 071 072 private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsCompactionObserver.class); 073 074 // From private field BucketAllocator.DEFAULT_BUCKET_SIZES 075 private static final long DEFAULT_MAX_BUCKET_SIZE = 512 * 1024 + 1024; 076 private static final ConcurrentMap<TableName, Long> TABLE_COUNTERS = new ConcurrentHashMap(); 077 private static final String ROW_STATISTICS_DROPPED = "rowStatisticsDropped"; 078 private static final String ROW_STATISTICS_PUT_FAILED = "rowStatisticsPutFailures"; 079 private Counter rowStatisticsDropped; 080 private Counter rowStatisticsPutFailed; 081 private long maxCacheSize; 082 private final RowStatisticsRecorder recorder; 083 084 @InterfaceAudience.Private 085 public RowStatisticsCompactionObserver(RowStatisticsRecorder recorder) { 086 this.recorder = recorder; 087 } 088 089 public RowStatisticsCompactionObserver() { 090 this(null); 091 } 092 093 @Override 094 public Optional<RegionObserver> getRegionObserver() { 095 return Optional.of(this); 096 } 097 098 @Override 099 public Optional<MasterObserver> getMasterObserver() { 100 return Optional.of(this); 101 } 102 103 @Override 104 public void start(CoprocessorEnvironment e) throws IOException { 105 if (!(e instanceof RegionCoprocessorEnvironment)) { 106 return; 107 } 108 RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e; 109 if (regionEnv.getRegionInfo().getTable().isSystemTable()) { 110 return; 111 } 112 String[] configuredBuckets = 113 regionEnv.getConfiguration().getStrings(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY); 114 maxCacheSize = DEFAULT_MAX_BUCKET_SIZE; 115 if (configuredBuckets != null && configuredBuckets.length > 0) { 116 String lastBucket = configuredBuckets[configuredBuckets.length - 1]; 117 try { 118 maxCacheSize = Integer.parseInt(lastBucket.trim()); 119 } catch (NumberFormatException ex) { 120 LOG.warn("Failed to parse {} value {} as int", BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY, 121 lastBucket, ex); 122 } 123 } 124 rowStatisticsDropped = 125 regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED); 126 rowStatisticsPutFailed = 127 regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED); 128 TableName tableName = regionEnv.getRegionInfo().getTable(); 129 TABLE_COUNTERS.merge(tableName, 1L, Long::sum); 130 } 131 132 @Override 133 public void stop(CoprocessorEnvironment e) throws IOException { 134 if (!(e instanceof RegionCoprocessorEnvironment)) { 135 return; 136 } 137 RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e; 138 if (regionEnv.getRegionInfo().getTable().isSystemTable()) { 139 return; 140 } 141 TableName tableName = regionEnv.getRegionInfo().getTable(); 142 long tableCount = TABLE_COUNTERS.merge(tableName, -1L, Long::sum); 143 if (tableCount == 0) { 144 long regionCount = 0; 145 for (long count : TABLE_COUNTERS.values()) { 146 regionCount += count; 147 } 148 if (regionCount == 0) { 149 regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_DROPPED, 150 rowStatisticsDropped); 151 regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_PUT_FAILED, 152 rowStatisticsPutFailed); 153 RowStatisticsTableRecorder tableRecorder = 154 (RowStatisticsTableRecorder) regionEnv.getSharedData().get(TABLE_RECORDER_KEY); 155 if (tableRecorder != null) { 156 regionEnv.getSharedData().remove(TABLE_RECORDER_KEY, tableRecorder); 157 tableRecorder.close(); 158 } 159 } 160 } 161 } 162 163 @Override 164 public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx) 165 throws IOException { 166 try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) { 167 if (admin.tableExists(NAMESPACED_TABLE_NAME)) { 168 LOG.info("Table {} already exists. Skipping table creation process.", 169 NAMESPACED_TABLE_NAME); 170 } else { 171 boolean shouldCreateNamespace = 172 Arrays.stream(admin.listNamespaces()).filter(namespace -> namespace.equals(NAMESPACE)) 173 .collect(Collectors.toUnmodifiableSet()).isEmpty(); 174 if (shouldCreateNamespace) { 175 NamespaceDescriptor nd = NamespaceDescriptor.create(NAMESPACE).build(); 176 try { 177 admin.createNamespace(nd); 178 } catch (IOException e) { 179 LOG.error("Failed to create namespace {}", NAMESPACE, e); 180 } 181 } 182 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF).setMaxVersions(25) 183 .setTimeToLive((int) Duration.ofDays(7).toSeconds()).build(); 184 TableDescriptor td = 185 TableDescriptorBuilder.newBuilder(NAMESPACED_TABLE_NAME).setColumnFamily(cfd).build(); 186 LOG.info("Creating table {}", NAMESPACED_TABLE_NAME); 187 try { 188 admin.createTable(td); 189 } catch (IOException e) { 190 LOG.error("Failed to create table {}", NAMESPACED_TABLE_NAME, e); 191 } 192 } 193 } catch (IOException e) { 194 LOG.error("Failed to get Connection or Admin. Cannot determine if table {} exists.", 195 NAMESPACED_TABLE_NAME, e); 196 } 197 } 198 199 @Override 200 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> context, 201 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 202 CompactionRequest request) { 203 if (store.getTableName().isSystemTable()) { 204 return scanner; 205 } 206 int blocksize = store.getColumnFamilyDescriptor().getBlocksize(); 207 boolean isMajor = request.isMajor(); 208 RowStatisticsImpl stats = new RowStatisticsImpl(store.getTableName().getNameAsString(), 209 store.getRegionInfo().getEncodedName(), store.getColumnFamilyName(), blocksize, maxCacheSize, 210 isMajor); 211 return new RowStatisticsScanner(scanner, stats, context.getEnvironment(), recorder); 212 } 213 214 private static class RowStatisticsScanner implements InternalScanner, Shipper { 215 216 private final InternalScanner scanner; 217 private final Shipper shipper; 218 private final RowStatisticsImpl rowStatistics; 219 private final RegionCoprocessorEnvironment regionEnv; 220 private final Counter rowStatisticsDropped; 221 private final Counter rowStatisticsPutFailed; 222 private final RowStatisticsRecorder customRecorder; 223 private RawCellBuilder cellBuilder; 224 private Cell lastCell; 225 226 public RowStatisticsScanner(InternalScanner scanner, RowStatisticsImpl rowStatistics, 227 RegionCoprocessorEnvironment regionEnv, RowStatisticsRecorder customRecorder) { 228 this.scanner = scanner; 229 if (scanner instanceof Shipper) { 230 this.shipper = (Shipper) scanner; 231 } else { 232 this.shipper = null; 233 } 234 this.rowStatistics = rowStatistics; 235 this.regionEnv = regionEnv; 236 this.cellBuilder = regionEnv.getCellBuilder(); 237 this.rowStatisticsDropped = 238 regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED); 239 this.rowStatisticsPutFailed = 240 regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED); 241 this.customRecorder = customRecorder; 242 } 243 244 @Override 245 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 246 throws IOException { 247 boolean ret = scanner.next(result, scannerContext); 248 consumeCells(result); 249 return ret; 250 } 251 252 @Override 253 public boolean next(List<? super ExtendedCell> result) throws IOException { 254 boolean ret = scanner.next(result); 255 consumeCells(result); 256 return ret; 257 } 258 259 @Override 260 public void close() throws IOException { 261 rowStatistics.handleRowChanged(lastCell); 262 rowStatistics.shipped(cellBuilder); 263 record(); 264 scanner.close(); 265 } 266 267 @Override 268 public void shipped() throws IOException { 269 if (shipper != null) { 270 lastCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, lastCell); 271 rowStatistics.shipped(cellBuilder); 272 shipper.shipped(); 273 } 274 } 275 276 private void consumeCells(List<? super ExtendedCell> result) { 277 if (result.isEmpty()) { 278 return; 279 } 280 // each next() call returns at most 1 row (maybe less for large rows) 281 // so we just need to check if the first cell has changed rows 282 ExtendedCell first = (ExtendedCell) result.get(0); 283 if (rowChanged(first)) { 284 rowStatistics.handleRowChanged(lastCell); 285 } 286 for (int i = 0; i < result.size(); i++) { 287 ExtendedCell cell = (ExtendedCell) result.get(i); 288 rowStatistics.consumeCell(cell); 289 lastCell = cell; 290 } 291 } 292 293 private boolean rowChanged(Cell cell) { 294 if (lastCell == null) { 295 return false; 296 } 297 return !CellUtil.matchingRows(lastCell, cell); 298 } 299 300 private void record() { 301 RowStatisticsTableRecorder tableRecorder = 302 (RowStatisticsTableRecorder) regionEnv.getSharedData().computeIfAbsent(TABLE_RECORDER_KEY, 303 k -> RowStatisticsTableRecorder.forClusterConnection(regionEnv.getConnection(), 304 rowStatisticsDropped, rowStatisticsPutFailed)); 305 if (tableRecorder != null) { 306 tableRecorder.record(this.rowStatistics, 307 Optional.of(regionEnv.getRegion().getRegionInfo().getRegionName())); 308 } else { 309 LOG.error( 310 "Failed to initialize a TableRecorder. Will not record row statistics for region={}", 311 rowStatistics.getRegion()); 312 rowStatisticsDropped.increment(); 313 } 314 if (customRecorder != null) { 315 customRecorder.record(this.rowStatistics, Optional.empty()); 316 } 317 } 318 } 319}