001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023import org.apache.hadoop.hbase.util.Strings;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
027
028/**
029 * This class is used for exporting some of the info from replication metrics
030 */
031@InterfaceAudience.Private
032public class ReplicationLoad {
033
034  // Empty load instance.
035  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
036  private MetricsSink sinkMetrics;
037
038  private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
039  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
040
041  /** default constructor */
042  public ReplicationLoad() {
043    super();
044  }
045
046  /**
047   * buildReplicationLoad
048   * @param sources List of ReplicationSource instances for which metrics should be reported
049   */
050
051  public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
052    final MetricsSink skMetrics) {
053    this.sinkMetrics = skMetrics;
054
055    // build the SinkLoad
056    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
057      ClusterStatusProtos.ReplicationLoadSink.newBuilder();
058    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
059    rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
060    rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
061    rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
062    this.replicationLoadSink = rLoadSinkBuild.build();
063
064    this.replicationLoadSourceEntries = new ArrayList<>();
065    for (ReplicationSourceInterface source : sources) {
066      MetricsSource sm = source.getSourceMetrics();
067      // Get the actual peer id
068      String peerId = sm.getPeerID();
069      String[] parts = peerId.split("-", 2);
070      peerId = parts.length != 1 ? parts[0] : peerId;
071
072      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
073      int sizeOfLogQueue = sm.getSizeOfLogQueue();
074      long editsRead = sm.getReplicableEdits();
075      long oPsShipped = sm.getOpsShipped();
076      long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
077      long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
078      long replicationLag = sm.getReplicationDelay();
079      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
080        ClusterStatusProtos.ReplicationLoadSource.newBuilder();
081      rLoadSourceBuild.setPeerID(peerId);
082      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
083      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
084      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
085      rLoadSourceBuild.setReplicationLag(replicationLag);
086      rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
087      rLoadSourceBuild.setEditsRead(editsRead);
088      rLoadSourceBuild.setOPsShipped(oPsShipped);
089      if (source instanceof ReplicationSource) {
090        ReplicationSource replSource = (ReplicationSource) source;
091        rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered());
092        rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId());
093        rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
094        rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate > 0);
095      }
096
097      this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
098    }
099  }
100
101  /**
102   * sourceToString
103   * @return a string contains sourceReplicationLoad information
104   */
105  public String sourceToString() {
106    StringBuilder sb = new StringBuilder();
107
108    for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceEntries) {
109
110      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
111      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
112      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
113      sb = Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
114        (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
115      sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
116    }
117
118    return sb.toString();
119  }
120
121  /**
122   * sinkToString
123   * @return a string contains sinkReplicationLoad information
124   */
125  public String sinkToString() {
126    if (this.replicationLoadSink == null) return null;
127
128    StringBuilder sb = new StringBuilder();
129    sb = Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
130      this.replicationLoadSink.getAgeOfLastAppliedOp());
131    sb = Strings.appendKeyValue(sb, "TimestampsOfLastAppliedOp",
132      (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
133
134    return sb.toString();
135  }
136
137  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
138    return this.replicationLoadSink;
139  }
140
141  public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceEntries() {
142    return this.replicationLoadSourceEntries;
143  }
144
145  /**
146   * @see java.lang.Object#toString()
147   */
148  @Override
149  public String toString() {
150    return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
151  }
152
153}