001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.concurrent.CountDownLatch;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
048import org.apache.hadoop.hbase.regionserver.HRegion;
049import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.ReplicationTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061@Category({ ReplicationTests.class, LargeTests.class })
062public class TestMultiSlaveReplication {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestMultiSlaveReplication.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class);
069
070  private static Configuration conf1;
071  private static Configuration conf2;
072  private static Configuration conf3;
073
074  private static HBaseTestingUtil utility1;
075  private static HBaseTestingUtil utility2;
076  private static HBaseTestingUtil utility3;
077  private static final long SLEEP_TIME = 500;
078  private static final int NB_RETRIES = 100;
079
080  private static final TableName tableName = TableName.valueOf("test");
081  private static final byte[] famName = Bytes.toBytes("f");
082  private static final byte[] row = Bytes.toBytes("row");
083  private static final byte[] row1 = Bytes.toBytes("row1");
084  private static final byte[] row2 = Bytes.toBytes("row2");
085  private static final byte[] row3 = Bytes.toBytes("row3");
086  private static final byte[] noRepfamName = Bytes.toBytes("norep");
087
088  private static TableDescriptor table;
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    conf1 = HBaseConfiguration.create();
093    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
094    // smaller block size and capacity to trigger more operations
095    // and test them
096    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
097    conf1.setInt("replication.source.size.capacity", 1024);
098    conf1.setLong("replication.source.sleepforretries", 100);
099    conf1.setInt("hbase.regionserver.maxlogs", 10);
100    conf1.setLong("hbase.master.logcleaner.ttl", 10);
101    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
102    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
103      "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
104    conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
105
106    utility1 = new HBaseTestingUtil(conf1);
107    utility1.startMiniZKCluster();
108    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
109    utility1.setZkCluster(miniZK);
110
111    conf2 = new Configuration(conf1);
112    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
113
114    conf3 = new Configuration(conf1);
115    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
116
117    utility2 = new HBaseTestingUtil(conf2);
118    utility2.setZkCluster(miniZK);
119
120    utility3 = new HBaseTestingUtil(conf3);
121    utility3.setZkCluster(miniZK);
122
123    table = TableDescriptorBuilder.newBuilder(tableName)
124      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
125        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
126      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
127  }
128
129  @Test
130  public void testMultiSlaveReplication() throws Exception {
131    LOG.info("testCyclicReplication");
132    utility1.startMiniCluster();
133    utility2.startMiniCluster();
134    utility3.startMiniCluster();
135    try (Connection conn = ConnectionFactory.createConnection(conf1);
136      Admin admin1 = conn.getAdmin()) {
137      utility1.getAdmin().createTable(table);
138      utility2.getAdmin().createTable(table);
139      utility3.getAdmin().createTable(table);
140      Table htable1 = utility1.getConnection().getTable(tableName);
141      Table htable2 = utility2.getConnection().getTable(tableName);
142      Table htable3 = utility3.getConnection().getTable(tableName);
143
144      ReplicationPeerConfigBuilder rpcBuilder =
145        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI());
146      admin1.addReplicationPeer("1", rpcBuilder.build());
147
148      // put "row" and wait 'til it got around, then delete
149      putAndWait(row, famName, htable1, htable2);
150      deleteAndWait(row, htable1, htable2);
151      // check it wasn't replication to cluster 3
152      checkRow(row, 0, htable3);
153
154      putAndWait(row2, famName, htable1, htable2);
155
156      // now roll the region server's logs
157      rollWALAndWait(utility1, htable1.getName(), row2);
158
159      // after the log was rolled put a new row
160      putAndWait(row3, famName, htable1, htable2);
161
162      rpcBuilder.setClusterKey(utility3.getRpcConnnectionURI());
163      admin1.addReplicationPeer("2", rpcBuilder.build());
164
165      // put a row, check it was replicated to all clusters
166      putAndWait(row1, famName, htable1, htable2, htable3);
167      // delete and verify
168      deleteAndWait(row1, htable1, htable2, htable3);
169
170      // make sure row2 did not get replicated after
171      // cluster 3 was added
172      checkRow(row2, 0, htable3);
173
174      // row3 will get replicated, because it was in the
175      // latest log
176      checkRow(row3, 1, htable3);
177
178      Put p = new Put(row);
179      p.addColumn(famName, row, row);
180      htable1.put(p);
181      // now roll the logs again
182      rollWALAndWait(utility1, htable1.getName(), row);
183
184      // cleanup "row2", also conveniently use this to wait replication
185      // to finish
186      deleteAndWait(row2, htable1, htable2, htable3);
187      // Even if the log was rolled in the middle of the replication
188      // "row" is still replication.
189      checkRow(row, 1, htable2);
190      // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
191      // we should wait before checking.
192      checkWithWait(row, 1, htable3);
193
194      // cleanup the rest
195      deleteAndWait(row, htable1, htable2, htable3);
196      deleteAndWait(row3, htable1, htable2, htable3);
197
198      utility3.shutdownMiniCluster();
199      utility2.shutdownMiniCluster();
200      utility1.shutdownMiniCluster();
201    }
202  }
203
204  private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table,
205    final byte[] row) throws IOException {
206    final Admin admin = utility.getAdmin();
207    final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster();
208
209    // find the region that corresponds to the given row.
210    HRegion region = null;
211    for (HRegion candidate : cluster.getRegions(table)) {
212      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
213        region = candidate;
214        break;
215      }
216    }
217    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
218
219    final CountDownLatch latch = new CountDownLatch(1);
220
221    // listen for successful log rolls
222    final WALActionsListener listener = new WALActionsListener() {
223      @Override
224      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
225        latch.countDown();
226      }
227    };
228    region.getWAL().registerWALActionsListener(listener);
229
230    // request a roll
231    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
232      region.getRegionInfo().getRegionName()));
233
234    // wait
235    try {
236      latch.await();
237    } catch (InterruptedException exception) {
238      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
239        + "replication tests fail, it's probably because we should still be waiting.");
240      Thread.currentThread().interrupt();
241    }
242    region.getWAL().unregisterWALActionsListener(listener);
243  }
244
245  private void checkWithWait(byte[] row, int count, Table table) throws Exception {
246    Get get = new Get(row);
247    for (int i = 0; i < NB_RETRIES; i++) {
248      if (i == NB_RETRIES - 1) {
249        fail("Waited too much time while getting the row.");
250      }
251      boolean rowReplicated = false;
252      Result res = table.get(get);
253      if (res.size() >= 1) {
254        LOG.info("Row is replicated");
255        rowReplicated = true;
256        assertEquals("Table '" + table + "' did not have the expected number of  results.", count,
257          res.size());
258        break;
259      }
260      if (rowReplicated) {
261        break;
262      } else {
263        Thread.sleep(SLEEP_TIME);
264      }
265    }
266  }
267
268  private void checkRow(byte[] row, int count, Table... tables) throws IOException {
269    Get get = new Get(row);
270    for (Table table : tables) {
271      Result res = table.get(get);
272      assertEquals("Table '" + table + "' did not have the expected number of results.", count,
273        res.size());
274    }
275  }
276
277  private void deleteAndWait(byte[] row, Table source, Table... targets) throws Exception {
278    Delete del = new Delete(row);
279    source.delete(del);
280
281    Get get = new Get(row);
282    for (int i = 0; i < NB_RETRIES; i++) {
283      if (i == NB_RETRIES - 1) {
284        fail("Waited too much time for del replication");
285      }
286      boolean removedFromAll = true;
287      for (Table target : targets) {
288        Result res = target.get(get);
289        if (res.size() >= 1) {
290          LOG.info("Row not deleted");
291          removedFromAll = false;
292          break;
293        }
294      }
295      if (removedFromAll) {
296        break;
297      } else {
298        Thread.sleep(SLEEP_TIME);
299      }
300    }
301  }
302
303  private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets) throws Exception {
304    Put put = new Put(row);
305    put.addColumn(fam, row, row);
306    source.put(put);
307
308    Get get = new Get(row);
309    for (int i = 0; i < NB_RETRIES; i++) {
310      if (i == NB_RETRIES - 1) {
311        fail("Waited too much time for put replication");
312      }
313      boolean replicatedToAll = true;
314      for (Table target : targets) {
315        Result res = target.get(get);
316        if (res.isEmpty()) {
317          LOG.info("Row not available");
318          replicatedToAll = false;
319          break;
320        } else {
321          assertArrayEquals(res.value(), row);
322        }
323      }
324      if (replicatedToAll) {
325        break;
326      } else {
327        Thread.sleep(SLEEP_TIME);
328      }
329    }
330  }
331
332}