001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.ipc.RpcServer;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.Assert;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Base class for testing replication for dropped tables.
046 */
047public class ReplicationDroppedTablesTestBase extends TestReplicationBase {
048
049  private static final Logger LOG = LoggerFactory.getLogger(ReplicationDroppedTablesTestBase.class);
050
051  protected static final int ROWS_COUNT = 1000;
052
053  protected static byte[] VALUE;
054
055  private static boolean ALLOW_PROCEEDING;
056
057  protected static void setupClusters(boolean allowProceeding) throws Exception {
058    // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
059    // batches. the default max request size is 256M, so all replication entries are in a batch, but
060    // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
061    // may apply first, and then test_dropped table, and we will believe that the replication is not
062    // got stuck (HBASE-20475).
063    // we used to use 10K but the regionServerReport is greater than this limit in this test which
064    // makes this test fail, increase to 64K
065    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 64 * 1024);
066    // set a large value size to make sure we will split the replication to several batches
067    VALUE = new byte[4096];
068    ThreadLocalRandom.current().nextBytes(VALUE);
069    // make sure we have a single region server only, so that all
070    // edits for all tables go there
071    NUM_SLAVES1 = 1;
072    NUM_SLAVES2 = 1;
073    ALLOW_PROCEEDING = allowProceeding;
074    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
075    CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
076    TestReplicationBase.setUpBeforeClass();
077  }
078
079  protected final byte[] generateRowKey(int id) {
080    return Bytes.toBytes(String.format("NormalPut%03d", id));
081  }
082
083  protected final void testEditsBehindDroppedTable(String tName) throws Exception {
084    TableName tablename = TableName.valueOf(tName);
085    byte[] familyName = Bytes.toBytes("fam");
086    byte[] row = Bytes.toBytes("row");
087
088    TableDescriptor table =
089      TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
090        .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
091
092    Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
093    Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
094    try (Admin admin1 = connection1.getAdmin()) {
095      admin1.createTable(table);
096    }
097    try (Admin admin2 = connection2.getAdmin()) {
098      admin2.createTable(table);
099    }
100    UTIL1.waitUntilAllRegionsAssigned(tablename);
101    UTIL2.waitUntilAllRegionsAssigned(tablename);
102
103    // now suspend replication
104    try (Admin admin1 = connection1.getAdmin()) {
105      admin1.disableReplicationPeer(PEER_ID2);
106    }
107
108    // put some data (lead with 0 so the edit gets sorted before the other table's edits
109    // in the replication batch) write a bunch of edits, making sure we fill a batch
110    try (Table droppedTable = connection1.getTable(tablename)) {
111      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
112      Put put = new Put(rowKey);
113      put.addColumn(familyName, row, VALUE);
114      droppedTable.put(put);
115    }
116
117    try (Table table1 = connection1.getTable(tableName)) {
118      for (int i = 0; i < ROWS_COUNT; i++) {
119        Put put = new Put(generateRowKey(i)).addColumn(famName, row, VALUE);
120        table1.put(put);
121      }
122    }
123
124    try (Admin admin1 = connection1.getAdmin()) {
125      admin1.disableTable(tablename);
126      admin1.deleteTable(tablename);
127    }
128    try (Admin admin2 = connection2.getAdmin()) {
129      admin2.disableTable(tablename);
130      admin2.deleteTable(tablename);
131    }
132
133    try (Admin admin1 = connection1.getAdmin()) {
134      admin1.enableReplicationPeer(PEER_ID2);
135    }
136
137    if (ALLOW_PROCEEDING) {
138      // in this we'd expect the key to make it over
139      verifyReplicationProceeded();
140    } else {
141      verifyReplicationStuck();
142    }
143  }
144
145  private boolean peerHasAllNormalRows() throws IOException {
146    try (ResultScanner scanner = htable2.getScanner(new Scan())) {
147      Result[] results = scanner.next(ROWS_COUNT);
148      if (results.length != ROWS_COUNT) {
149        return false;
150      }
151      for (int i = 0; i < results.length; i++) {
152        Assert.assertArrayEquals(generateRowKey(i), results[i].getRow());
153      }
154      return true;
155    }
156  }
157
158  protected final void verifyReplicationProceeded() throws Exception {
159    for (int i = 0; i < NB_RETRIES; i++) {
160      if (i == NB_RETRIES - 1) {
161        fail("Waited too much time for put replication");
162      }
163      if (!peerHasAllNormalRows()) {
164        LOG.info("Row not available");
165        Thread.sleep(SLEEP_TIME);
166      } else {
167        break;
168      }
169    }
170  }
171
172  protected final void verifyReplicationStuck() throws Exception {
173    for (int i = 0; i < NB_RETRIES; i++) {
174      if (peerHasAllNormalRows()) {
175        fail("Edit should have been stuck behind dropped tables");
176      } else {
177        LOG.info("Row not replicated, let's wait a bit more...");
178        Thread.sleep(SLEEP_TIME);
179      }
180    }
181  }
182}