001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example.row.stats;
019
020import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.CF;
021import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACE;
022import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME;
023import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.TABLE_RECORDER_KEY;
024
025import java.io.IOException;
026import java.time.Duration;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.stream.Collectors;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.CoprocessorEnvironment;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.RawCellBuilder;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.MasterObserver;
048import org.apache.hadoop.hbase.coprocessor.ObserverContext;
049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
051import org.apache.hadoop.hbase.coprocessor.RegionObserver;
052import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
053import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder;
054import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil;
055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
056import org.apache.hadoop.hbase.metrics.Counter;
057import org.apache.hadoop.hbase.regionserver.InternalScanner;
058import org.apache.hadoop.hbase.regionserver.ScanType;
059import org.apache.hadoop.hbase.regionserver.ScannerContext;
060import org.apache.hadoop.hbase.regionserver.Shipper;
061import org.apache.hadoop.hbase.regionserver.Store;
062import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
063import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068@InterfaceAudience.Private
069public class RowStatisticsCompactionObserver
070  implements RegionCoprocessor, RegionObserver, MasterCoprocessor, MasterObserver {
071
072  private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsCompactionObserver.class);
073
074  // From private field BucketAllocator.DEFAULT_BUCKET_SIZES
075  private static final long DEFAULT_MAX_BUCKET_SIZE = 512 * 1024 + 1024;
076  private static final ConcurrentMap<TableName, Long> TABLE_COUNTERS = new ConcurrentHashMap();
077  private static final String ROW_STATISTICS_DROPPED = "rowStatisticsDropped";
078  private static final String ROW_STATISTICS_PUT_FAILED = "rowStatisticsPutFailures";
079  private Counter rowStatisticsDropped;
080  private Counter rowStatisticsPutFailed;
081  private long maxCacheSize;
082  private final RowStatisticsRecorder recorder;
083
084  @InterfaceAudience.Private
085  public RowStatisticsCompactionObserver(RowStatisticsRecorder recorder) {
086    this.recorder = recorder;
087  }
088
089  public RowStatisticsCompactionObserver() {
090    this(null);
091  }
092
093  @Override
094  public Optional<RegionObserver> getRegionObserver() {
095    return Optional.of(this);
096  }
097
098  @Override
099  public Optional<MasterObserver> getMasterObserver() {
100    return Optional.of(this);
101  }
102
103  @Override
104  public void start(CoprocessorEnvironment e) throws IOException {
105    if (!(e instanceof RegionCoprocessorEnvironment)) {
106      return;
107    }
108    RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
109    if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
110      return;
111    }
112    String[] configuredBuckets =
113      regionEnv.getConfiguration().getStrings(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY);
114    maxCacheSize = DEFAULT_MAX_BUCKET_SIZE;
115    if (configuredBuckets != null && configuredBuckets.length > 0) {
116      String lastBucket = configuredBuckets[configuredBuckets.length - 1];
117      try {
118        maxCacheSize = Integer.parseInt(lastBucket.trim());
119      } catch (NumberFormatException ex) {
120        LOG.warn("Failed to parse {} value {} as int", BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY,
121          lastBucket, ex);
122      }
123    }
124    rowStatisticsDropped =
125      regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
126    rowStatisticsPutFailed =
127      regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
128    TableName tableName = regionEnv.getRegionInfo().getTable();
129    TABLE_COUNTERS.merge(tableName, 1L, Long::sum);
130  }
131
132  @Override
133  public void stop(CoprocessorEnvironment e) throws IOException {
134    if (!(e instanceof RegionCoprocessorEnvironment)) {
135      return;
136    }
137    RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
138    if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
139      return;
140    }
141    TableName tableName = regionEnv.getRegionInfo().getTable();
142    long tableCount = TABLE_COUNTERS.merge(tableName, -1L, Long::sum);
143    if (tableCount == 0) {
144      long regionCount = 0;
145      for (long count : TABLE_COUNTERS.values()) {
146        regionCount += count;
147      }
148      if (regionCount == 0) {
149        regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_DROPPED,
150          rowStatisticsDropped);
151        regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_PUT_FAILED,
152          rowStatisticsPutFailed);
153        RowStatisticsTableRecorder tableRecorder =
154          (RowStatisticsTableRecorder) regionEnv.getSharedData().get(TABLE_RECORDER_KEY);
155        if (tableRecorder != null) {
156          regionEnv.getSharedData().remove(TABLE_RECORDER_KEY, tableRecorder);
157          tableRecorder.close();
158        }
159      }
160    }
161  }
162
163  @Override
164  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
165    throws IOException {
166    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
167      if (admin.tableExists(NAMESPACED_TABLE_NAME)) {
168        LOG.info("Table {} already exists. Skipping table creation process.",
169          NAMESPACED_TABLE_NAME);
170      } else {
171        boolean shouldCreateNamespace =
172          Arrays.stream(admin.listNamespaces()).filter(namespace -> namespace.equals(NAMESPACE))
173            .collect(Collectors.toUnmodifiableSet()).isEmpty();
174        if (shouldCreateNamespace) {
175          NamespaceDescriptor nd = NamespaceDescriptor.create(NAMESPACE).build();
176          try {
177            admin.createNamespace(nd);
178          } catch (IOException e) {
179            LOG.error("Failed to create namespace {}", NAMESPACE, e);
180          }
181        }
182        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF).setMaxVersions(25)
183          .setTimeToLive((int) Duration.ofDays(7).toSeconds()).build();
184        TableDescriptor td =
185          TableDescriptorBuilder.newBuilder(NAMESPACED_TABLE_NAME).setColumnFamily(cfd).build();
186        LOG.info("Creating table {}", NAMESPACED_TABLE_NAME);
187        try {
188          admin.createTable(td);
189        } catch (IOException e) {
190          LOG.error("Failed to create table {}", NAMESPACED_TABLE_NAME, e);
191        }
192      }
193    } catch (IOException e) {
194      LOG.error("Failed to get Connection or Admin. Cannot determine if table {} exists.",
195        NAMESPACED_TABLE_NAME, e);
196    }
197  }
198
199  @Override
200  public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> context,
201    Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
202    CompactionRequest request) {
203    if (store.getTableName().isSystemTable()) {
204      return scanner;
205    }
206    int blocksize = store.getColumnFamilyDescriptor().getBlocksize();
207    boolean isMajor = request.isMajor();
208    RowStatisticsImpl stats = new RowStatisticsImpl(store.getTableName().getNameAsString(),
209      store.getRegionInfo().getEncodedName(), store.getColumnFamilyName(), blocksize, maxCacheSize,
210      isMajor);
211    return new RowStatisticsScanner(scanner, stats, context.getEnvironment(), recorder);
212  }
213
214  private static class RowStatisticsScanner implements InternalScanner, Shipper {
215
216    private final InternalScanner scanner;
217    private final Shipper shipper;
218    private final RowStatisticsImpl rowStatistics;
219    private final RegionCoprocessorEnvironment regionEnv;
220    private final Counter rowStatisticsDropped;
221    private final Counter rowStatisticsPutFailed;
222    private final RowStatisticsRecorder customRecorder;
223    private RawCellBuilder cellBuilder;
224    private Cell lastCell;
225
226    public RowStatisticsScanner(InternalScanner scanner, RowStatisticsImpl rowStatistics,
227      RegionCoprocessorEnvironment regionEnv, RowStatisticsRecorder customRecorder) {
228      this.scanner = scanner;
229      if (scanner instanceof Shipper) {
230        this.shipper = (Shipper) scanner;
231      } else {
232        this.shipper = null;
233      }
234      this.rowStatistics = rowStatistics;
235      this.regionEnv = regionEnv;
236      this.cellBuilder = regionEnv.getCellBuilder();
237      this.rowStatisticsDropped =
238        regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
239      this.rowStatisticsPutFailed =
240        regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
241      this.customRecorder = customRecorder;
242    }
243
244    @Override
245    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
246      throws IOException {
247      boolean ret = scanner.next(result, scannerContext);
248      consumeCells(result);
249      return ret;
250    }
251
252    @Override
253    public boolean next(List<? super ExtendedCell> result) throws IOException {
254      boolean ret = scanner.next(result);
255      consumeCells(result);
256      return ret;
257    }
258
259    @Override
260    public void close() throws IOException {
261      rowStatistics.handleRowChanged(lastCell);
262      rowStatistics.shipped(cellBuilder);
263      record();
264      scanner.close();
265    }
266
267    @Override
268    public void shipped() throws IOException {
269      if (shipper != null) {
270        lastCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, lastCell);
271        rowStatistics.shipped(cellBuilder);
272        shipper.shipped();
273      }
274    }
275
276    private void consumeCells(List<? super ExtendedCell> result) {
277      if (result.isEmpty()) {
278        return;
279      }
280      // each next() call returns at most 1 row (maybe less for large rows)
281      // so we just need to check if the first cell has changed rows
282      ExtendedCell first = (ExtendedCell) result.get(0);
283      if (rowChanged(first)) {
284        rowStatistics.handleRowChanged(lastCell);
285      }
286      for (int i = 0; i < result.size(); i++) {
287        ExtendedCell cell = (ExtendedCell) result.get(i);
288        rowStatistics.consumeCell(cell);
289        lastCell = cell;
290      }
291    }
292
293    private boolean rowChanged(Cell cell) {
294      if (lastCell == null) {
295        return false;
296      }
297      return !CellUtil.matchingRows(lastCell, cell);
298    }
299
300    private void record() {
301      RowStatisticsTableRecorder tableRecorder =
302        (RowStatisticsTableRecorder) regionEnv.getSharedData().computeIfAbsent(TABLE_RECORDER_KEY,
303          k -> RowStatisticsTableRecorder.forClusterConnection(regionEnv.getConnection(),
304            rowStatisticsDropped, rowStatisticsPutFailed));
305      if (tableRecorder != null) {
306        tableRecorder.record(this.rowStatistics,
307          Optional.of(regionEnv.getRegion().getRegionInfo().getRegionName()));
308      } else {
309        LOG.error(
310          "Failed to initialize a TableRecorder. Will not record row statistics for region={}",
311          rowStatistics.getRegion());
312        rowStatisticsDropped.increment();
313      }
314      if (customRecorder != null) {
315        customRecorder.record(this.rowStatistics, Optional.empty());
316      }
317    }
318  }
319}