001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.ipc.RpcServer; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.TestReplicationBase; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 048 049@Category(MediumTests.class) 050public class TestReplicator extends TestReplicationBase { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicator.class); 055 056 static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class); 057 static final int NUM_ROWS = 10; 058 059 @BeforeClass 060 public static void setUpBeforeClass() throws Exception { 061 // Set RPC size limit to 10kb (will be applied to both source and sink clusters) 062 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); 063 TestReplicationBase.setUpBeforeClass(); 064 } 065 066 @Test 067 public void testReplicatorBatching() throws Exception { 068 // Clear the tables 069 truncateTable(UTIL1, tableName); 070 truncateTable(UTIL2, tableName); 071 072 // Replace the peer set up for us by the base class with a wrapper for this test 073 hbaseAdmin.addReplicationPeer("testReplicatorBatching", 074 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 075 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); 076 077 ReplicationEndpointForTest.setBatchCount(0); 078 ReplicationEndpointForTest.setEntriesCount(0); 079 try { 080 ReplicationEndpointForTest.pause(); 081 try { 082 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 083 // have to be replicated separately. 084 final byte[] valueBytes = new byte[8 * 1024]; 085 for (int i = 0; i < NUM_ROWS; i++) { 086 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 087 valueBytes)); 088 } 089 } finally { 090 ReplicationEndpointForTest.resume(); 091 } 092 093 // Wait for replication to complete. 094 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 095 @Override 096 public boolean evaluate() throws Exception { 097 LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); 098 return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS; 099 } 100 101 @Override 102 public String explainFailure() throws Exception { 103 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 104 } 105 }); 106 107 assertEquals("We sent an incorrect number of batches", NUM_ROWS, 108 ReplicationEndpointForTest.getBatchCount()); 109 assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); 110 } finally { 111 hbaseAdmin.removeReplicationPeer("testReplicatorBatching"); 112 } 113 } 114 115 @Test 116 public void testReplicatorWithErrors() throws Exception { 117 // Clear the tables 118 truncateTable(UTIL1, tableName); 119 truncateTable(UTIL2, tableName); 120 121 // Replace the peer set up for us by the base class with a wrapper for this test 122 hbaseAdmin.addReplicationPeer("testReplicatorWithErrors", 123 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 124 .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()) 125 .build()); 126 127 FailureInjectingReplicationEndpointForTest.setBatchCount(0); 128 FailureInjectingReplicationEndpointForTest.setEntriesCount(0); 129 try { 130 FailureInjectingReplicationEndpointForTest.pause(); 131 try { 132 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 133 // have to be replicated separately. 134 final byte[] valueBytes = new byte[8 * 1024]; 135 for (int i = 0; i < NUM_ROWS; i++) { 136 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 137 valueBytes)); 138 } 139 } finally { 140 FailureInjectingReplicationEndpointForTest.resume(); 141 } 142 143 // Wait for replication to complete. 144 // We can expect 10 batches 145 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 146 @Override 147 public boolean evaluate() throws Exception { 148 return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; 149 } 150 151 @Override 152 public String explainFailure() throws Exception { 153 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 154 } 155 }); 156 157 assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); 158 } finally { 159 hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors"); 160 } 161 } 162 163 @AfterClass 164 public static void tearDownAfterClass() throws Exception { 165 TestReplicationBase.tearDownAfterClass(); 166 } 167 168 private void truncateTable(HBaseTestingUtil util, TableName tablename) throws IOException { 169 Admin admin = util.getAdmin(); 170 admin.disableTable(tableName); 171 admin.truncateTable(tablename, false); 172 } 173 174 public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { 175 176 protected static AtomicInteger batchCount = new AtomicInteger(0); 177 protected static int entriesCount; 178 private static final Object latch = new Object(); 179 private static AtomicBoolean useLatch = new AtomicBoolean(false); 180 181 public static void resume() { 182 useLatch.set(false); 183 synchronized (latch) { 184 latch.notifyAll(); 185 } 186 } 187 188 public static void pause() { 189 useLatch.set(true); 190 } 191 192 public static void await() throws InterruptedException { 193 if (useLatch.get()) { 194 LOG.info("Waiting on latch"); 195 synchronized (latch) { 196 latch.wait(); 197 } 198 LOG.info("Waited on latch, now proceeding"); 199 } 200 } 201 202 public static int getBatchCount() { 203 return batchCount.get(); 204 } 205 206 public static void setBatchCount(int i) { 207 LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount()); 208 batchCount.set(i); 209 } 210 211 public static int getEntriesCount() { 212 return entriesCount; 213 } 214 215 public static void setEntriesCount(int i) { 216 LOG.info("SetEntriesCount=" + i); 217 entriesCount = i; 218 } 219 220 @Override 221 public boolean replicate(ReplicateContext replicateContext) { 222 try { 223 await(); 224 } catch (InterruptedException e) { 225 LOG.warn("Interrupted waiting for latch", e); 226 } 227 return super.replicate(replicateContext); 228 } 229 230 @Override 231 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 232 int timeout) { 233 return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { 234 entriesCount += entries.size(); 235 int count = batchCount.incrementAndGet(); 236 LOG.info( 237 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 238 }); 239 240 } 241 } 242 243 public static class FailureInjectingReplicationEndpointForTest 244 extends ReplicationEndpointForTest { 245 private final AtomicBoolean failNext = new AtomicBoolean(false); 246 247 @Override 248 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 249 int timeout) { 250 251 if (failNext.compareAndSet(false, true)) { 252 return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { 253 entriesCount += entries.size(); 254 int count = batchCount.incrementAndGet(); 255 LOG.info( 256 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 257 }); 258 } else if (failNext.compareAndSet(true, false)) { 259 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 260 future.completeExceptionally(new ServiceException("Injected failure")); 261 return future; 262 } 263 return CompletableFuture.completedFuture(ordinal); 264 265 } 266 } 267}