001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Delete; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.ReplicationTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061@Category({ ReplicationTests.class, LargeTests.class }) 062public class TestMultiSlaveReplication { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestMultiSlaveReplication.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class); 069 070 private static Configuration conf1; 071 private static Configuration conf2; 072 private static Configuration conf3; 073 074 private static HBaseTestingUtil utility1; 075 private static HBaseTestingUtil utility2; 076 private static HBaseTestingUtil utility3; 077 private static final long SLEEP_TIME = 500; 078 private static final int NB_RETRIES = 100; 079 080 private static final TableName tableName = TableName.valueOf("test"); 081 private static final byte[] famName = Bytes.toBytes("f"); 082 private static final byte[] row = Bytes.toBytes("row"); 083 private static final byte[] row1 = Bytes.toBytes("row1"); 084 private static final byte[] row2 = Bytes.toBytes("row2"); 085 private static final byte[] row3 = Bytes.toBytes("row3"); 086 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 087 088 private static TableDescriptor table; 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 conf1 = HBaseConfiguration.create(); 093 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 094 // smaller block size and capacity to trigger more operations 095 // and test them 096 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 097 conf1.setInt("replication.source.size.capacity", 1024); 098 conf1.setLong("replication.source.sleepforretries", 100); 099 conf1.setInt("hbase.regionserver.maxlogs", 10); 100 conf1.setLong("hbase.master.logcleaner.ttl", 10); 101 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 102 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 103 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 104 conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); 105 106 utility1 = new HBaseTestingUtil(conf1); 107 utility1.startMiniZKCluster(); 108 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 109 utility1.setZkCluster(miniZK); 110 111 conf2 = new Configuration(conf1); 112 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 113 114 conf3 = new Configuration(conf1); 115 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 116 117 utility2 = new HBaseTestingUtil(conf2); 118 utility2.setZkCluster(miniZK); 119 120 utility3 = new HBaseTestingUtil(conf3); 121 utility3.setZkCluster(miniZK); 122 123 table = TableDescriptorBuilder.newBuilder(tableName) 124 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 125 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 126 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 127 } 128 129 @Test 130 public void testMultiSlaveReplication() throws Exception { 131 LOG.info("testCyclicReplication"); 132 utility1.startMiniCluster(); 133 utility2.startMiniCluster(); 134 utility3.startMiniCluster(); 135 try (Connection conn = ConnectionFactory.createConnection(conf1); 136 Admin admin1 = conn.getAdmin()) { 137 utility1.getAdmin().createTable(table); 138 utility2.getAdmin().createTable(table); 139 utility3.getAdmin().createTable(table); 140 Table htable1 = utility1.getConnection().getTable(tableName); 141 Table htable2 = utility2.getConnection().getTable(tableName); 142 Table htable3 = utility3.getConnection().getTable(tableName); 143 144 ReplicationPeerConfigBuilder rpcBuilder = 145 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()); 146 admin1.addReplicationPeer("1", rpcBuilder.build()); 147 148 // put "row" and wait 'til it got around, then delete 149 putAndWait(row, famName, htable1, htable2); 150 deleteAndWait(row, htable1, htable2); 151 // check it wasn't replication to cluster 3 152 checkRow(row, 0, htable3); 153 154 putAndWait(row2, famName, htable1, htable2); 155 156 // now roll the region server's logs 157 rollWALAndWait(utility1, htable1.getName(), row2); 158 159 // after the log was rolled put a new row 160 putAndWait(row3, famName, htable1, htable2); 161 162 rpcBuilder.setClusterKey(utility3.getRpcConnnectionURI()); 163 admin1.addReplicationPeer("2", rpcBuilder.build()); 164 165 // put a row, check it was replicated to all clusters 166 putAndWait(row1, famName, htable1, htable2, htable3); 167 // delete and verify 168 deleteAndWait(row1, htable1, htable2, htable3); 169 170 // make sure row2 did not get replicated after 171 // cluster 3 was added 172 checkRow(row2, 0, htable3); 173 174 // row3 will get replicated, because it was in the 175 // latest log 176 checkRow(row3, 1, htable3); 177 178 Put p = new Put(row); 179 p.addColumn(famName, row, row); 180 htable1.put(p); 181 // now roll the logs again 182 rollWALAndWait(utility1, htable1.getName(), row); 183 184 // cleanup "row2", also conveniently use this to wait replication 185 // to finish 186 deleteAndWait(row2, htable1, htable2, htable3); 187 // Even if the log was rolled in the middle of the replication 188 // "row" is still replication. 189 checkRow(row, 1, htable2); 190 // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, 191 // we should wait before checking. 192 checkWithWait(row, 1, htable3); 193 194 // cleanup the rest 195 deleteAndWait(row, htable1, htable2, htable3); 196 deleteAndWait(row3, htable1, htable2, htable3); 197 198 utility3.shutdownMiniCluster(); 199 utility2.shutdownMiniCluster(); 200 utility1.shutdownMiniCluster(); 201 } 202 } 203 204 private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table, 205 final byte[] row) throws IOException { 206 final Admin admin = utility.getAdmin(); 207 final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster(); 208 209 // find the region that corresponds to the given row. 210 HRegion region = null; 211 for (HRegion candidate : cluster.getRegions(table)) { 212 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 213 region = candidate; 214 break; 215 } 216 } 217 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 218 219 final CountDownLatch latch = new CountDownLatch(1); 220 221 // listen for successful log rolls 222 final WALActionsListener listener = new WALActionsListener() { 223 @Override 224 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 225 latch.countDown(); 226 } 227 }; 228 region.getWAL().registerWALActionsListener(listener); 229 230 // request a roll 231 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 232 region.getRegionInfo().getRegionName())); 233 234 // wait 235 try { 236 latch.await(); 237 } catch (InterruptedException exception) { 238 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " 239 + "replication tests fail, it's probably because we should still be waiting."); 240 Thread.currentThread().interrupt(); 241 } 242 region.getWAL().unregisterWALActionsListener(listener); 243 } 244 245 private void checkWithWait(byte[] row, int count, Table table) throws Exception { 246 Get get = new Get(row); 247 for (int i = 0; i < NB_RETRIES; i++) { 248 if (i == NB_RETRIES - 1) { 249 fail("Waited too much time while getting the row."); 250 } 251 boolean rowReplicated = false; 252 Result res = table.get(get); 253 if (res.size() >= 1) { 254 LOG.info("Row is replicated"); 255 rowReplicated = true; 256 assertEquals("Table '" + table + "' did not have the expected number of results.", count, 257 res.size()); 258 break; 259 } 260 if (rowReplicated) { 261 break; 262 } else { 263 Thread.sleep(SLEEP_TIME); 264 } 265 } 266 } 267 268 private void checkRow(byte[] row, int count, Table... tables) throws IOException { 269 Get get = new Get(row); 270 for (Table table : tables) { 271 Result res = table.get(get); 272 assertEquals("Table '" + table + "' did not have the expected number of results.", count, 273 res.size()); 274 } 275 } 276 277 private void deleteAndWait(byte[] row, Table source, Table... targets) throws Exception { 278 Delete del = new Delete(row); 279 source.delete(del); 280 281 Get get = new Get(row); 282 for (int i = 0; i < NB_RETRIES; i++) { 283 if (i == NB_RETRIES - 1) { 284 fail("Waited too much time for del replication"); 285 } 286 boolean removedFromAll = true; 287 for (Table target : targets) { 288 Result res = target.get(get); 289 if (res.size() >= 1) { 290 LOG.info("Row not deleted"); 291 removedFromAll = false; 292 break; 293 } 294 } 295 if (removedFromAll) { 296 break; 297 } else { 298 Thread.sleep(SLEEP_TIME); 299 } 300 } 301 } 302 303 private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) throws Exception { 304 Put put = new Put(row); 305 put.addColumn(fam, row, row); 306 source.put(put); 307 308 Get get = new Get(row); 309 for (int i = 0; i < NB_RETRIES; i++) { 310 if (i == NB_RETRIES - 1) { 311 fail("Waited too much time for put replication"); 312 } 313 boolean replicatedToAll = true; 314 for (Table target : targets) { 315 Result res = target.get(get); 316 if (res.isEmpty()) { 317 LOG.info("Row not available"); 318 replicatedToAll = false; 319 break; 320 } else { 321 assertArrayEquals(res.value(), row); 322 } 323 } 324 if (replicatedToAll) { 325 break; 326 } else { 327 Thread.sleep(SLEEP_TIME); 328 } 329 } 330 } 331 332}