001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.HashMap; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 030import org.apache.hadoop.hbase.StartTestingClusterOption; 031import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.zookeeper.KeeperException; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 048 049@Category({ MasterTests.class, MediumTests.class }) 050public class TestGetReplicationLoad { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestGetReplicationLoad.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); 056 057 private static SingleProcessHBaseCluster cluster; 058 private static HMaster master; 059 private static HBaseTestingUtil TEST_UTIL; 060 061 public static class MyMaster extends HMaster { 062 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 063 super(conf); 064 } 065 } 066 067 @BeforeClass 068 public static void startCluster() throws Exception { 069 LOG.info("Starting cluster"); 070 TEST_UTIL = new HBaseTestingUtil(); 071 // Set master class and use default values for other options. 072 StartTestingClusterOption option = 073 StartTestingClusterOption.builder().masterClass(TestMasterMetrics.MyMaster.class).build(); 074 TEST_UTIL.startMiniCluster(option); 075 cluster = TEST_UTIL.getHBaseCluster(); 076 LOG.info("Waiting for active/ready master"); 077 cluster.waitForActiveAndReadyMaster(); 078 master = cluster.getMaster(); 079 } 080 081 @AfterClass 082 public static void after() throws Exception { 083 if (TEST_UTIL != null) { 084 TEST_UTIL.shutdownMiniCluster(); 085 } 086 } 087 088 @Test 089 public void testGetReplicationMetrics() throws Exception { 090 String peer1 = "test1", peer2 = "test2", queueId = "1"; 091 long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4, 092 timeStampOfNextToReplicate = 5, editsRead = 6, oPsShipped = 7; 093 int sizeOfLogQueue = 8; 094 boolean recovered = false, running = false, editsSinceRestart = false; 095 RegionServerStatusProtos.RegionServerReportRequest.Builder request = 096 RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); 097 ServerName serverName = cluster.getMaster(0).getServerName(); 098 request.setServer(ProtobufUtil.toServerName(serverName)); 099 ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource 100 .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) 101 .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) 102 .setSizeOfLogQueue(sizeOfLogQueue).setTimeStampOfNextToReplicate(timeStampOfNextToReplicate) 103 .setQueueId(queueId).setEditsRead(editsRead).setOPsShipped(oPsShipped).setRunning(running) 104 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 105 ClusterStatusProtos.ReplicationLoadSource rload2 = 106 ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) 107 .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) 108 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) 109 .setSizeOfLogQueue(sizeOfLogQueue + 1) 110 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate + 1).setQueueId(queueId) 111 .setEditsRead(editsRead + 1).setOPsShipped(oPsShipped + 1).setRunning(running) 112 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 113 114 ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() 115 .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); 116 request.setLoad(sl); 117 master.getReplicationPeerManager().addPeer(peer1, 118 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 119 master.getReplicationPeerManager().addPeer(peer2, 120 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 121 master.getMasterRpcServices().regionServerReport(null, request.build()); 122 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad = 123 master.getReplicationLoad(new ServerName[] { serverName }); 124 assertEquals("peer size ", 2, replicationLoad.size()); 125 assertEquals("load size ", 1, replicationLoad.get(peer1).size()); 126 assertEquals("log queue size of peer1", sizeOfLogQueue, 127 replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); 128 assertEquals("replication lag of peer2", replicationLag + 1, 129 replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); 130 master.stopMaster(); 131 } 132}