001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.junit.Assert.fail;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY is
050 * false.
051 */
052@Category({ LargeTests.class })
053public class TestReplicationStuckWithDroppedTable {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestReplicationStuckWithDroppedTable.class);
058
059  private static final Logger LOG =
060    LoggerFactory.getLogger(TestReplicationEditsDroppedWithDroppedTable.class);
061
062  private static Configuration conf1 = HBaseConfiguration.create();
063  private static Configuration conf2 = HBaseConfiguration.create();
064
065  protected static HBaseTestingUtil utility1;
066  protected static HBaseTestingUtil utility2;
067
068  private static Admin admin1;
069  private static Admin admin2;
070
071  private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table");
072  private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table");
073  private static final byte[] ROW = Bytes.toBytes("row");
074  private static final byte[] FAMILY = Bytes.toBytes("f");
075  private static final byte[] QUALIFIER = Bytes.toBytes("q");
076  private static final byte[] VALUE = Bytes.toBytes("value");
077
078  private static final String PEER_ID = "1";
079  private static final long SLEEP_TIME = 1000;
080  private static final int NB_RETRIES = 10;
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
085    conf1.setInt("replication.source.nb.capacity", 1);
086    utility1 = new HBaseTestingUtil(conf1);
087    utility1.startMiniZKCluster();
088    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
089    conf1 = utility1.getConfiguration();
090
091    conf2 = HBaseConfiguration.create(conf1);
092    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
093    utility2 = new HBaseTestingUtil(conf2);
094    utility2.setZkCluster(miniZK);
095
096    utility1.startMiniCluster(1);
097    utility2.startMiniCluster(1);
098
099    admin1 = utility1.getAdmin();
100    admin2 = utility2.getAdmin();
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    utility2.shutdownMiniCluster();
106    utility1.shutdownMiniCluster();
107  }
108
109  private void createTable(TableName tableName) throws Exception {
110    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
111      .setColumnFamily(
112        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
113      .build();
114    admin1.createTable(desc);
115    admin2.createTable(desc);
116    utility1.waitUntilAllRegionsAssigned(tableName);
117    utility2.waitUntilAllRegionsAssigned(tableName);
118  }
119
120  @Test
121  public void testEditsStuckBehindDroppedTable() throws Exception {
122    // add peer
123    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
124      .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build();
125    admin1.addReplicationPeer(PEER_ID, rpc);
126
127    // create table
128    createTable(NORMAL_TABLE);
129    createTable(DROPPED_TABLE);
130
131    admin1.disableReplicationPeer(PEER_ID);
132
133    try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) {
134      Put put = new Put(ROW);
135      put.addColumn(FAMILY, QUALIFIER, VALUE);
136      droppedTable.put(put);
137    }
138
139    admin1.disableTable(DROPPED_TABLE);
140    admin1.deleteTable(DROPPED_TABLE);
141    admin2.disableTable(DROPPED_TABLE);
142    admin2.deleteTable(DROPPED_TABLE);
143
144    admin1.enableReplicationPeer(PEER_ID);
145
146    verifyReplicationStuck();
147
148    // Remove peer
149    admin1.removeReplicationPeer(PEER_ID);
150    // Drop table
151    admin1.disableTable(NORMAL_TABLE);
152    admin1.deleteTable(NORMAL_TABLE);
153    admin2.disableTable(NORMAL_TABLE);
154    admin2.deleteTable(NORMAL_TABLE);
155  }
156
157  private void verifyReplicationStuck() throws Exception {
158    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
159      Put put = new Put(ROW);
160      put.addColumn(FAMILY, QUALIFIER, VALUE);
161      normalTable.put(put);
162    }
163    try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
164      for (int i = 0; i < NB_RETRIES; i++) {
165        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
166        if (result != null && !result.isEmpty()) {
167          fail("Edit should have been stuck behind dropped tables, but value is "
168            + Bytes.toString(result.getValue(FAMILY, QUALIFIER)));
169        } else {
170          LOG.info("Row not replicated, let's wait a bit more...");
171          Thread.sleep(SLEEP_TIME);
172        }
173      }
174    }
175  }
176}