001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.metrics.BaseSource;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This class is for maintaining the various replication statistics for a source and publishing them
035 * through the metrics interfaces.
036 */
037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
038public class MetricsSource implements BaseSource {
039
040  private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class);
041
042  // tracks last shipped timestamp for each wal group
043  private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>();
044  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
045  private long lastHFileRefsQueueSize = 0;
046  private String id;
047  private long timeStampNextToReplicate;
048
049  private final MetricsReplicationSourceSource singleSourceSource;
050  private final MetricsReplicationGlobalSourceSource globalSourceSource;
051  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
052
053  /**
054   * Constructor used to register the metrics
055   * @param id Name of the source this class is monitoring
056   */
057  public MetricsSource(String id) {
058    this.id = id;
059    singleSourceSource = CompatibilitySingletonFactory
060      .getInstance(MetricsReplicationSourceFactory.class).getSource(id);
061    globalSourceSource = CompatibilitySingletonFactory
062      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
063    singleSourceSourceByTable = new HashMap<>();
064  }
065
066  /**
067   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
068   * @param id                 Name of the source this class is monitoring
069   * @param singleSourceSource Class to monitor id-scoped metrics
070   * @param globalSourceSource Class to monitor global-scoped metrics
071   */
072  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
073    MetricsReplicationGlobalSourceSource globalSourceSource,
074    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
075    this.id = id;
076    this.singleSourceSource = singleSourceSource;
077    this.globalSourceSource = globalSourceSource;
078    this.singleSourceSourceByTable = singleSourceSourceByTable;
079  }
080
081  /**
082   * Set the age of the last edit that was shipped
083   * @param timestamp target write time of the edit
084   * @param walGroup  which group we are setting
085   */
086  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
087    long age = EnvironmentEdgeManager.currentTime() - timestamp;
088    singleSourceSource.setLastShippedAge(age);
089    globalSourceSource.setLastShippedAge(age);
090    this.ageOfLastShippedOp.put(walGroup, age);
091    this.lastShippedTimeStamps.put(walGroup, timestamp);
092  }
093
094  /**
095   * Update the table level replication metrics per table
096   * @param walEntries List of pairs of WAL entry and it's size
097   */
098  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
099    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
100      Entry entry = walEntryWithSize.getFirst();
101      long entrySize = walEntryWithSize.getSecond();
102      String tableName = entry.getKey().getTableName().getNameAsString();
103      long writeTime = entry.getKey().getWriteTime();
104      long age = EnvironmentEdgeManager.currentTime() - writeTime;
105
106      // get the replication metrics source for table at the run time
107      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
108        .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
109          .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t));
110      tableSource.setLastShippedAge(age);
111      tableSource.incrShippedBytes(entrySize);
112    }
113  }
114
115  /**
116   * Set the age of the last edit that was shipped group by table
117   * @param timestamp write time of the edit
118   * @param tableName String as group and tableName
119   */
120  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
121    long age = EnvironmentEdgeManager.currentTime() - timestamp;
122    this.getSingleSourceSourceByTable()
123      .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory
124        .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
125      .setLastShippedAge(age);
126  }
127
128  /**
129   * get age of last shipped op of given wal group. If the walGroup is null, return 0
130   * @param walGroup which group we are getting
131   */
132  public long getAgeOfLastShippedOp(String walGroup) {
133    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
134  }
135
136  /**
137   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
138   * when replication fails and need to keep that metric accurate.
139   * @param walGroupId id of the group to update
140   */
141  public void refreshAgeOfLastShippedOp(String walGroupId) {
142    Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId);
143    if (lastTimestamp == null) {
144      this.lastShippedTimeStamps.put(walGroupId, 0L);
145      lastTimestamp = 0L;
146    }
147    if (lastTimestamp > 0) {
148      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
149    }
150  }
151
152  /**
153   * Increment size of the log queue.
154   */
155  public void incrSizeOfLogQueue() {
156    singleSourceSource.incrSizeOfLogQueue(1);
157    globalSourceSource.incrSizeOfLogQueue(1);
158  }
159
160  public void decrSizeOfLogQueue() {
161    singleSourceSource.decrSizeOfLogQueue(1);
162    globalSourceSource.decrSizeOfLogQueue(1);
163  }
164
165  /**
166   * Increment the count for initializing sources
167   */
168  public void incrSourceInitializing() {
169    singleSourceSource.incrSourceInitializing();
170    globalSourceSource.incrSourceInitializing();
171  }
172
173  /**
174   * Decrement the count for initializing sources
175   */
176  public void decrSourceInitializing() {
177    singleSourceSource.decrSourceInitializing();
178    globalSourceSource.decrSourceInitializing();
179  }
180
181  /**
182   * Add on the the number of log edits read
183   * @param delta the number of log edits read.
184   */
185  private void incrLogEditsRead(long delta) {
186    singleSourceSource.incrLogReadInEdits(delta);
187    globalSourceSource.incrLogReadInEdits(delta);
188  }
189
190  /** Increment the number of log edits read by one. */
191  public void incrLogEditsRead() {
192    incrLogEditsRead(1);
193  }
194
195  /**
196   * Add on the number of log edits filtered
197   * @param delta the number filtered.
198   */
199  public void incrLogEditsFiltered(long delta) {
200    singleSourceSource.incrLogEditsFiltered(delta);
201    globalSourceSource.incrLogEditsFiltered(delta);
202  }
203
204  /** The number of log edits filtered out. */
205  public void incrLogEditsFiltered() {
206    incrLogEditsFiltered(1);
207  }
208
209  /**
210   * Convience method to apply changes to metrics do to shipping a batch of logs.
211   * @param batchSize the size of the batch that was shipped to sinks.
212   */
213  public void shipBatch(long batchSize, int sizeInBytes) {
214    singleSourceSource.incrBatchesShipped(1);
215    globalSourceSource.incrBatchesShipped(1);
216
217    singleSourceSource.incrOpsShipped(batchSize);
218    globalSourceSource.incrOpsShipped(batchSize);
219
220    singleSourceSource.incrShippedBytes(sizeInBytes);
221    globalSourceSource.incrShippedBytes(sizeInBytes);
222  }
223
224  /**
225   * Convenience method to update metrics when batch of operations has failed.
226   */
227  public void incrementFailedBatches() {
228    singleSourceSource.incrFailedBatches();
229    globalSourceSource.incrFailedBatches();
230  }
231
232  /**
233   * Gets the number of edits not eligible for replication this source queue logs so far.
234   * @return logEditsFiltered non-replicable edits filtered from this queue logs.
235   */
236  public long getEditsFiltered() {
237    return this.singleSourceSource.getEditsFiltered();
238  }
239
240  /**
241   * Gets the number of edits eligible for replication read from this source queue logs so far.
242   * @return replicableEdits total number of replicable edits read from this queue logs.
243   */
244  public long getReplicableEdits() {
245    return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered();
246  }
247
248  /**
249   * Gets the number of OPs shipped by this source queue to target cluster.
250   * @return oPsShipped total number of OPs shipped by this source.
251   */
252  public long getOpsShipped() {
253    return this.singleSourceSource.getShippedOps();
254  }
255
256  /**
257   * Convience method to apply changes to metrics do to shipping a batch of logs.
258   * @param batchSize the size of the batch that was shipped to sinks.
259   * @param hfiles    total number of hfiles shipped to sinks.
260   */
261  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
262    shipBatch(batchSize, sizeInBytes);
263    singleSourceSource.incrHFilesShipped(hfiles);
264    globalSourceSource.incrHFilesShipped(hfiles);
265  }
266
267  /** increase the byte number read by source from log file */
268  public void incrLogReadInBytes(long readInBytes) {
269    singleSourceSource.incrLogReadInBytes(readInBytes);
270    globalSourceSource.incrLogReadInBytes(readInBytes);
271  }
272
273  /** Removes all metrics about this Source. */
274  public void clear() {
275    terminate();
276    singleSourceSource.clear();
277  }
278
279  public void terminate() {
280    int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
281    globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
282    singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
283    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
284    lastShippedTimeStamps.clear();
285    lastHFileRefsQueueSize = 0;
286    timeStampNextToReplicate = 0;
287  }
288
289  /**
290   * Get AgeOfLastShippedOp
291   */
292  public Long getAgeOfLastShippedOp() {
293    return singleSourceSource.getLastShippedAge();
294  }
295
296  /**
297   * Get the sizeOfLogQueue
298   */
299  public int getSizeOfLogQueue() {
300    return singleSourceSource.getSizeOfLogQueue();
301  }
302
303  /**
304   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
305   * @deprecated Since 2.0.0. Removed in 3.0.0.
306   * @see #getTimestampOfLastShippedOp()
307   */
308  @Deprecated
309  public long getTimeStampOfLastShippedOp() {
310    return getTimestampOfLastShippedOp();
311  }
312
313  /**
314   * Get the value of uncleanlyClosedWAL counter
315   */
316  public long getUncleanlyClosedWALs() {
317    return singleSourceSource.getUncleanlyClosedWALs();
318  }
319
320  /**
321   * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
322   */
323  public long getTimestampOfLastShippedOp() {
324    long lastTimestamp = 0L;
325    for (long ts : lastShippedTimeStamps.values()) {
326      if (ts > lastTimestamp) {
327        lastTimestamp = ts;
328      }
329    }
330    return lastTimestamp;
331  }
332
333  /**
334   * TimeStamp of next edit to be replicated.
335   * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated.
336   */
337  public long getTimeStampNextToReplicate() {
338    return timeStampNextToReplicate;
339  }
340
341  /**
342   * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp
343   * is greater than timestamp of last shipped, it means there's at least one edit pending
344   * replication.
345   * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated.
346   */
347  public void setTimeStampNextToReplicate(long timeStampNextToReplicate) {
348    this.timeStampNextToReplicate = timeStampNextToReplicate;
349  }
350
351  public long getReplicationDelay() {
352    if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) {
353      return 0;
354    } else {
355      return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate;
356    }
357  }
358
359  /**
360   * Get the source initializing counts
361   * @return number of replication sources getting initialized
362   */
363  public int getSourceInitializing() {
364    return singleSourceSource.getSourceInitializing();
365  }
366
367  /**
368   * Get the slave peer ID
369   */
370  public String getPeerID() {
371    return id;
372  }
373
374  public void incrSizeOfHFileRefsQueue(long size) {
375    singleSourceSource.incrSizeOfHFileRefsQueue(size);
376    globalSourceSource.incrSizeOfHFileRefsQueue(size);
377    lastHFileRefsQueueSize = size;
378  }
379
380  public void decrSizeOfHFileRefsQueue(int size) {
381    singleSourceSource.decrSizeOfHFileRefsQueue(size);
382    globalSourceSource.decrSizeOfHFileRefsQueue(size);
383    lastHFileRefsQueueSize -= size;
384    if (lastHFileRefsQueueSize < 0) {
385      lastHFileRefsQueueSize = 0;
386    }
387  }
388
389  public void incrUnknownFileLengthForClosedWAL() {
390    singleSourceSource.incrUnknownFileLengthForClosedWAL();
391    globalSourceSource.incrUnknownFileLengthForClosedWAL();
392  }
393
394  public void incrUncleanlyClosedWALs() {
395    singleSourceSource.incrUncleanlyClosedWALs();
396    globalSourceSource.incrUncleanlyClosedWALs();
397  }
398
399  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
400    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
401    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
402  }
403
404  public void incrRestartedWALReading() {
405    singleSourceSource.incrRestartedWALReading();
406    globalSourceSource.incrRestartedWALReading();
407  }
408
409  public void incrRepeatedFileBytes(final long bytes) {
410    singleSourceSource.incrRepeatedFileBytes(bytes);
411    globalSourceSource.incrRepeatedFileBytes(bytes);
412  }
413
414  public void incrCompletedWAL() {
415    singleSourceSource.incrCompletedWAL();
416    globalSourceSource.incrCompletedWAL();
417  }
418
419  public void incrCompletedRecoveryQueue() {
420    singleSourceSource.incrCompletedRecoveryQueue();
421    globalSourceSource.incrCompletedRecoveryQueue();
422  }
423
424  public void incrFailedRecoveryQueue() {
425    globalSourceSource.incrFailedRecoveryQueue();
426  }
427
428  /*
429   * Sets the age of oldest log file just for source.
430   */
431  public void setOldestWalAge(long age) {
432    singleSourceSource.setOldestWalAge(age);
433  }
434
435  public long getOldestWalAge() {
436    return singleSourceSource.getOldestWalAge();
437  }
438
439  @Override
440  public void init() {
441    singleSourceSource.init();
442    globalSourceSource.init();
443  }
444
445  @Override
446  public void setGauge(String gaugeName, long value) {
447    singleSourceSource.setGauge(gaugeName, value);
448    globalSourceSource.setGauge(gaugeName, value);
449  }
450
451  @Override
452  public void incGauge(String gaugeName, long delta) {
453    singleSourceSource.incGauge(gaugeName, delta);
454    globalSourceSource.incGauge(gaugeName, delta);
455  }
456
457  @Override
458  public void decGauge(String gaugeName, long delta) {
459    singleSourceSource.decGauge(gaugeName, delta);
460    globalSourceSource.decGauge(gaugeName, delta);
461  }
462
463  @Override
464  public void removeMetric(String key) {
465    singleSourceSource.removeMetric(key);
466    globalSourceSource.removeMetric(key);
467  }
468
469  @Override
470  public void incCounters(String counterName, long delta) {
471    singleSourceSource.incCounters(counterName, delta);
472    globalSourceSource.incCounters(counterName, delta);
473  }
474
475  @Override
476  public void updateHistogram(String name, long value) {
477    singleSourceSource.updateHistogram(name, value);
478    globalSourceSource.updateHistogram(name, value);
479  }
480
481  @Override
482  public String getMetricsContext() {
483    return globalSourceSource.getMetricsContext();
484  }
485
486  @Override
487  public String getMetricsDescription() {
488    return globalSourceSource.getMetricsDescription();
489  }
490
491  @Override
492  public String getMetricsJmxContext() {
493    return globalSourceSource.getMetricsJmxContext();
494  }
495
496  @Override
497  public String getMetricsName() {
498    return globalSourceSource.getMetricsName();
499  }
500
501  @InterfaceAudience.Private
502  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
503    return singleSourceSourceByTable;
504  }
505
506  /**
507   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
508   */
509  public void setWALReaderEditsBufferUsage(long usageInBytes) {
510    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
511  }
512
513  /**
514   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
515   */
516  public long getWALReaderEditsBufferUsage() {
517    return globalSourceSource.getWALReaderEditsBufferBytes();
518  }
519}