001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 024import org.apache.hadoop.hbase.HBaseInterfaceAudience; 025import org.apache.hadoop.hbase.metrics.BaseSource; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.wal.WAL.Entry; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This class is for maintaining the various replication statistics for a source and publishing them 035 * through the metrics interfaces. 036 */ 037@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 038public class MetricsSource implements BaseSource { 039 040 private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); 041 042 // tracks last shipped timestamp for each wal group 043 private Map<String, Long> lastShippedTimeStamps = new HashMap<String, Long>(); 044 private Map<String, Long> ageOfLastShippedOp = new HashMap<>(); 045 private long lastHFileRefsQueueSize = 0; 046 private String id; 047 private long timeStampNextToReplicate; 048 049 private final MetricsReplicationSourceSource singleSourceSource; 050 private final MetricsReplicationGlobalSourceSource globalSourceSource; 051 private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable; 052 053 /** 054 * Constructor used to register the metrics 055 * @param id Name of the source this class is monitoring 056 */ 057 public MetricsSource(String id) { 058 this.id = id; 059 singleSourceSource = CompatibilitySingletonFactory 060 .getInstance(MetricsReplicationSourceFactory.class).getSource(id); 061 globalSourceSource = CompatibilitySingletonFactory 062 .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 063 singleSourceSourceByTable = new HashMap<>(); 064 } 065 066 /** 067 * Constructor for injecting custom (or test) MetricsReplicationSourceSources 068 * @param id Name of the source this class is monitoring 069 * @param singleSourceSource Class to monitor id-scoped metrics 070 * @param globalSourceSource Class to monitor global-scoped metrics 071 */ 072 public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, 073 MetricsReplicationGlobalSourceSource globalSourceSource, 074 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) { 075 this.id = id; 076 this.singleSourceSource = singleSourceSource; 077 this.globalSourceSource = globalSourceSource; 078 this.singleSourceSourceByTable = singleSourceSourceByTable; 079 } 080 081 /** 082 * Set the age of the last edit that was shipped 083 * @param timestamp target write time of the edit 084 * @param walGroup which group we are setting 085 */ 086 public void setAgeOfLastShippedOp(long timestamp, String walGroup) { 087 long age = EnvironmentEdgeManager.currentTime() - timestamp; 088 singleSourceSource.setLastShippedAge(age); 089 globalSourceSource.setLastShippedAge(age); 090 this.ageOfLastShippedOp.put(walGroup, age); 091 this.lastShippedTimeStamps.put(walGroup, timestamp); 092 } 093 094 /** 095 * Update the table level replication metrics per table 096 * @param walEntries List of pairs of WAL entry and it's size 097 */ 098 public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) { 099 for (Pair<Entry, Long> walEntryWithSize : walEntries) { 100 Entry entry = walEntryWithSize.getFirst(); 101 long entrySize = walEntryWithSize.getSecond(); 102 String tableName = entry.getKey().getTableName().getNameAsString(); 103 long writeTime = entry.getKey().getWriteTime(); 104 long age = EnvironmentEdgeManager.currentTime() - writeTime; 105 106 // get the replication metrics source for table at the run time 107 MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable() 108 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 109 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)); 110 tableSource.setLastShippedAge(age); 111 tableSource.incrShippedBytes(entrySize); 112 } 113 } 114 115 /** 116 * Set the age of the last edit that was shipped group by table 117 * @param timestamp write time of the edit 118 * @param tableName String as group and tableName 119 */ 120 public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { 121 long age = EnvironmentEdgeManager.currentTime() - timestamp; 122 this.getSingleSourceSourceByTable() 123 .computeIfAbsent(tableName, t -> CompatibilitySingletonFactory 124 .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t)) 125 .setLastShippedAge(age); 126 } 127 128 /** 129 * get age of last shipped op of given wal group. If the walGroup is null, return 0 130 * @param walGroup which group we are getting 131 */ 132 public long getAgeOfLastShippedOp(String walGroup) { 133 return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); 134 } 135 136 /** 137 * Convenience method to use the last given timestamp to refresh the age of the last edit. Used 138 * when replication fails and need to keep that metric accurate. 139 * @param walGroupId id of the group to update 140 */ 141 public void refreshAgeOfLastShippedOp(String walGroupId) { 142 Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); 143 if (lastTimestamp == null) { 144 this.lastShippedTimeStamps.put(walGroupId, 0L); 145 lastTimestamp = 0L; 146 } 147 if (lastTimestamp > 0) { 148 setAgeOfLastShippedOp(lastTimestamp, walGroupId); 149 } 150 } 151 152 /** 153 * Increment size of the log queue. 154 */ 155 public void incrSizeOfLogQueue() { 156 singleSourceSource.incrSizeOfLogQueue(1); 157 globalSourceSource.incrSizeOfLogQueue(1); 158 } 159 160 public void decrSizeOfLogQueue() { 161 singleSourceSource.decrSizeOfLogQueue(1); 162 globalSourceSource.decrSizeOfLogQueue(1); 163 } 164 165 /** 166 * Increment the count for initializing sources 167 */ 168 public void incrSourceInitializing() { 169 singleSourceSource.incrSourceInitializing(); 170 globalSourceSource.incrSourceInitializing(); 171 } 172 173 /** 174 * Decrement the count for initializing sources 175 */ 176 public void decrSourceInitializing() { 177 singleSourceSource.decrSourceInitializing(); 178 globalSourceSource.decrSourceInitializing(); 179 } 180 181 /** 182 * Add on the the number of log edits read 183 * @param delta the number of log edits read. 184 */ 185 private void incrLogEditsRead(long delta) { 186 singleSourceSource.incrLogReadInEdits(delta); 187 globalSourceSource.incrLogReadInEdits(delta); 188 } 189 190 /** Increment the number of log edits read by one. */ 191 public void incrLogEditsRead() { 192 incrLogEditsRead(1); 193 } 194 195 /** 196 * Add on the number of log edits filtered 197 * @param delta the number filtered. 198 */ 199 public void incrLogEditsFiltered(long delta) { 200 singleSourceSource.incrLogEditsFiltered(delta); 201 globalSourceSource.incrLogEditsFiltered(delta); 202 } 203 204 /** The number of log edits filtered out. */ 205 public void incrLogEditsFiltered() { 206 incrLogEditsFiltered(1); 207 } 208 209 /** 210 * Convience method to apply changes to metrics do to shipping a batch of logs. 211 * @param batchSize the size of the batch that was shipped to sinks. 212 */ 213 public void shipBatch(long batchSize, int sizeInBytes) { 214 singleSourceSource.incrBatchesShipped(1); 215 globalSourceSource.incrBatchesShipped(1); 216 217 singleSourceSource.incrOpsShipped(batchSize); 218 globalSourceSource.incrOpsShipped(batchSize); 219 220 singleSourceSource.incrShippedBytes(sizeInBytes); 221 globalSourceSource.incrShippedBytes(sizeInBytes); 222 } 223 224 /** 225 * Convenience method to update metrics when batch of operations has failed. 226 */ 227 public void incrementFailedBatches() { 228 singleSourceSource.incrFailedBatches(); 229 globalSourceSource.incrFailedBatches(); 230 } 231 232 /** 233 * Gets the number of edits not eligible for replication this source queue logs so far. 234 * @return logEditsFiltered non-replicable edits filtered from this queue logs. 235 */ 236 public long getEditsFiltered() { 237 return this.singleSourceSource.getEditsFiltered(); 238 } 239 240 /** 241 * Gets the number of edits eligible for replication read from this source queue logs so far. 242 * @return replicableEdits total number of replicable edits read from this queue logs. 243 */ 244 public long getReplicableEdits() { 245 return this.singleSourceSource.getWALEditsRead() - this.singleSourceSource.getEditsFiltered(); 246 } 247 248 /** 249 * Gets the number of OPs shipped by this source queue to target cluster. 250 * @return oPsShipped total number of OPs shipped by this source. 251 */ 252 public long getOpsShipped() { 253 return this.singleSourceSource.getShippedOps(); 254 } 255 256 /** 257 * Convience method to apply changes to metrics do to shipping a batch of logs. 258 * @param batchSize the size of the batch that was shipped to sinks. 259 * @param hfiles total number of hfiles shipped to sinks. 260 */ 261 public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { 262 shipBatch(batchSize, sizeInBytes); 263 singleSourceSource.incrHFilesShipped(hfiles); 264 globalSourceSource.incrHFilesShipped(hfiles); 265 } 266 267 /** increase the byte number read by source from log file */ 268 public void incrLogReadInBytes(long readInBytes) { 269 singleSourceSource.incrLogReadInBytes(readInBytes); 270 globalSourceSource.incrLogReadInBytes(readInBytes); 271 } 272 273 /** Removes all metrics about this Source. */ 274 public void clear() { 275 terminate(); 276 singleSourceSource.clear(); 277 } 278 279 public void terminate() { 280 int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); 281 globalSourceSource.decrSizeOfLogQueue(lastQueueSize); 282 singleSourceSource.decrSizeOfLogQueue(lastQueueSize); 283 globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); 284 lastShippedTimeStamps.clear(); 285 lastHFileRefsQueueSize = 0; 286 timeStampNextToReplicate = 0; 287 } 288 289 /** 290 * Get AgeOfLastShippedOp 291 */ 292 public Long getAgeOfLastShippedOp() { 293 return singleSourceSource.getLastShippedAge(); 294 } 295 296 /** 297 * Get the sizeOfLogQueue 298 */ 299 public int getSizeOfLogQueue() { 300 return singleSourceSource.getSizeOfLogQueue(); 301 } 302 303 /** 304 * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one 305 * @deprecated Since 2.0.0. Removed in 3.0.0. 306 * @see #getTimestampOfLastShippedOp() 307 */ 308 @Deprecated 309 public long getTimeStampOfLastShippedOp() { 310 return getTimestampOfLastShippedOp(); 311 } 312 313 /** 314 * Get the value of uncleanlyClosedWAL counter 315 */ 316 public long getUncleanlyClosedWALs() { 317 return singleSourceSource.getUncleanlyClosedWALs(); 318 } 319 320 /** 321 * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one 322 */ 323 public long getTimestampOfLastShippedOp() { 324 long lastTimestamp = 0L; 325 for (long ts : lastShippedTimeStamps.values()) { 326 if (ts > lastTimestamp) { 327 lastTimestamp = ts; 328 } 329 } 330 return lastTimestamp; 331 } 332 333 /** 334 * TimeStamp of next edit to be replicated. 335 * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. 336 */ 337 public long getTimeStampNextToReplicate() { 338 return timeStampNextToReplicate; 339 } 340 341 /** 342 * TimeStamp of next edit targeted for replication. Used for calculating lag, as if this timestamp 343 * is greater than timestamp of last shipped, it means there's at least one edit pending 344 * replication. 345 * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. 346 */ 347 public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { 348 this.timeStampNextToReplicate = timeStampNextToReplicate; 349 } 350 351 public long getReplicationDelay() { 352 if (getTimestampOfLastShippedOp() >= timeStampNextToReplicate) { 353 return 0; 354 } else { 355 return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; 356 } 357 } 358 359 /** 360 * Get the source initializing counts 361 * @return number of replication sources getting initialized 362 */ 363 public int getSourceInitializing() { 364 return singleSourceSource.getSourceInitializing(); 365 } 366 367 /** 368 * Get the slave peer ID 369 */ 370 public String getPeerID() { 371 return id; 372 } 373 374 public void incrSizeOfHFileRefsQueue(long size) { 375 singleSourceSource.incrSizeOfHFileRefsQueue(size); 376 globalSourceSource.incrSizeOfHFileRefsQueue(size); 377 lastHFileRefsQueueSize = size; 378 } 379 380 public void decrSizeOfHFileRefsQueue(int size) { 381 singleSourceSource.decrSizeOfHFileRefsQueue(size); 382 globalSourceSource.decrSizeOfHFileRefsQueue(size); 383 lastHFileRefsQueueSize -= size; 384 if (lastHFileRefsQueueSize < 0) { 385 lastHFileRefsQueueSize = 0; 386 } 387 } 388 389 public void incrUnknownFileLengthForClosedWAL() { 390 singleSourceSource.incrUnknownFileLengthForClosedWAL(); 391 globalSourceSource.incrUnknownFileLengthForClosedWAL(); 392 } 393 394 public void incrUncleanlyClosedWALs() { 395 singleSourceSource.incrUncleanlyClosedWALs(); 396 globalSourceSource.incrUncleanlyClosedWALs(); 397 } 398 399 public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { 400 singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 401 globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes); 402 } 403 404 public void incrRestartedWALReading() { 405 singleSourceSource.incrRestartedWALReading(); 406 globalSourceSource.incrRestartedWALReading(); 407 } 408 409 public void incrRepeatedFileBytes(final long bytes) { 410 singleSourceSource.incrRepeatedFileBytes(bytes); 411 globalSourceSource.incrRepeatedFileBytes(bytes); 412 } 413 414 public void incrCompletedWAL() { 415 singleSourceSource.incrCompletedWAL(); 416 globalSourceSource.incrCompletedWAL(); 417 } 418 419 public void incrCompletedRecoveryQueue() { 420 singleSourceSource.incrCompletedRecoveryQueue(); 421 globalSourceSource.incrCompletedRecoveryQueue(); 422 } 423 424 public void incrFailedRecoveryQueue() { 425 globalSourceSource.incrFailedRecoveryQueue(); 426 } 427 428 /* 429 * Sets the age of oldest log file just for source. 430 */ 431 public void setOldestWalAge(long age) { 432 singleSourceSource.setOldestWalAge(age); 433 } 434 435 public long getOldestWalAge() { 436 return singleSourceSource.getOldestWalAge(); 437 } 438 439 @Override 440 public void init() { 441 singleSourceSource.init(); 442 globalSourceSource.init(); 443 } 444 445 @Override 446 public void setGauge(String gaugeName, long value) { 447 singleSourceSource.setGauge(gaugeName, value); 448 globalSourceSource.setGauge(gaugeName, value); 449 } 450 451 @Override 452 public void incGauge(String gaugeName, long delta) { 453 singleSourceSource.incGauge(gaugeName, delta); 454 globalSourceSource.incGauge(gaugeName, delta); 455 } 456 457 @Override 458 public void decGauge(String gaugeName, long delta) { 459 singleSourceSource.decGauge(gaugeName, delta); 460 globalSourceSource.decGauge(gaugeName, delta); 461 } 462 463 @Override 464 public void removeMetric(String key) { 465 singleSourceSource.removeMetric(key); 466 globalSourceSource.removeMetric(key); 467 } 468 469 @Override 470 public void incCounters(String counterName, long delta) { 471 singleSourceSource.incCounters(counterName, delta); 472 globalSourceSource.incCounters(counterName, delta); 473 } 474 475 @Override 476 public void updateHistogram(String name, long value) { 477 singleSourceSource.updateHistogram(name, value); 478 globalSourceSource.updateHistogram(name, value); 479 } 480 481 @Override 482 public String getMetricsContext() { 483 return globalSourceSource.getMetricsContext(); 484 } 485 486 @Override 487 public String getMetricsDescription() { 488 return globalSourceSource.getMetricsDescription(); 489 } 490 491 @Override 492 public String getMetricsJmxContext() { 493 return globalSourceSource.getMetricsJmxContext(); 494 } 495 496 @Override 497 public String getMetricsName() { 498 return globalSourceSource.getMetricsName(); 499 } 500 501 @InterfaceAudience.Private 502 public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() { 503 return singleSourceSourceByTable; 504 } 505 506 /** 507 * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. 508 */ 509 public void setWALReaderEditsBufferUsage(long usageInBytes) { 510 globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); 511 } 512 513 /** 514 * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. 515 */ 516 public long getWALReaderEditsBufferUsage() { 517 return globalSourceSource.getWALReaderEditsBufferBytes(); 518 } 519}