001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.HBaseTestingUtility;
025import org.apache.hadoop.hbase.HColumnDescriptor;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.HTableDescriptor;
028import org.apache.hadoop.hbase.HTestConst;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
036import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
043import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
044import org.junit.AfterClass;
045import org.junit.Assert;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ ReplicationTests.class, LargeTests.class })
056public class TestGlobalReplicationThrottler {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestGlobalReplicationThrottler.class);
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestGlobalReplicationThrottler.class);
063  private static final int REPLICATION_SOURCE_QUOTA = 200;
064  private static int numOfPeer = 0;
065  private static Configuration conf1;
066  private static Configuration conf2;
067
068  private static HBaseTestingUtility utility1;
069  private static HBaseTestingUtility utility2;
070
071  private static final byte[] famName = Bytes.toBytes("f");
072  private static final byte[] VALUE = Bytes.toBytes("v");
073  private static final byte[] ROW = Bytes.toBytes("r");
074  private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
075
076  @Rule
077  public TestName name = new TestName();
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    conf1 = HBaseConfiguration.create();
082    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
083    conf1.setLong("replication.source.sleepforretries", 100);
084    // Each WAL is about 120 bytes
085    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, REPLICATION_SOURCE_QUOTA);
086    conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
087
088    utility1 = new HBaseTestingUtility(conf1);
089    utility1.startMiniZKCluster();
090    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
091    new ZKWatcher(conf1, "cluster1", null, true);
092
093    conf2 = new Configuration(conf1);
094    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
095
096    utility2 = new HBaseTestingUtility(conf2);
097    utility2.setZkCluster(miniZK);
098    new ZKWatcher(conf2, "cluster2", null, true);
099
100    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
101    rpc.setClusterKey(utility2.getClusterKey());
102
103    utility1.startMiniCluster();
104    utility2.startMiniCluster();
105
106    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
107    admin1.addPeer("peer1", rpc, null);
108    admin1.addPeer("peer2", rpc, null);
109    admin1.addPeer("peer3", rpc, null);
110    numOfPeer = admin1.getPeersCount();
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    utility2.shutdownMiniCluster();
116    utility1.shutdownMiniCluster();
117  }
118
119  volatile private boolean testQuotaPass = false;
120  volatile private boolean testQuotaNonZero = false;
121
122  @Test
123  public void testQuota() throws IOException {
124    final TableName tableName = TableName.valueOf(name.getMethodName());
125    HTableDescriptor table = new HTableDescriptor(tableName);
126    HColumnDescriptor fam = new HColumnDescriptor(famName);
127    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
128    table.addFamily(fam);
129    utility1.getAdmin().createTable(table);
130    utility2.getAdmin().createTable(table);
131
132    Thread watcher = new Thread(() -> {
133      Replication replication = (Replication) utility1.getMiniHBaseCluster().getRegionServer(0)
134        .getReplicationSourceService();
135      testQuotaPass = true;
136      while (!Thread.interrupted()) {
137        long size = replication.getReplicationManager().getTotalBufferUsed();
138        if (size > 0) {
139          testQuotaNonZero = true;
140        }
141        // the reason here doing "numOfPeer + 1" is because by using method addEntryToBatch(), even
142        // the
143        // batch size (after added last entry) exceeds quota, it still keeps the last one in the
144        // batch
145        // so total used buffer size can be one "replication.total.buffer.quota" larger than
146        // expected
147        if (size > REPLICATION_SOURCE_QUOTA * (numOfPeer + 1)) {
148          // We read logs first then check throttler, so if the buffer quota limiter doesn't
149          // take effect, it will push many logs and exceed the quota.
150          testQuotaPass = false;
151        }
152        Threads.sleep(50);
153      }
154    });
155    watcher.start();
156
157    try (Table t1 = utility1.getConnection().getTable(tableName);
158      Table t2 = utility2.getConnection().getTable(tableName)) {
159      for (int i = 0; i < 50; i++) {
160        Put put = new Put(ROWS[i]);
161        put.addColumn(famName, VALUE, VALUE);
162        t1.put(put);
163      }
164      long start = EnvironmentEdgeManager.currentTime();
165      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
166        Scan scan = new Scan();
167        scan.setCaching(50);
168        int count = 0;
169        try (ResultScanner results = t2.getScanner(scan)) {
170          for (Result result : results) {
171            count++;
172          }
173        }
174        if (count < 50) {
175          LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count);
176          Threads.sleep(200);
177          continue;
178        }
179        break;
180      }
181    }
182
183    watcher.interrupt();
184    Assert.assertTrue(testQuotaPass);
185    Assert.assertTrue(testQuotaNonZero);
186  }
187
188}