001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.replication.regionserver.Replication;
036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
048
049/**
050 * Testcase for HBASE-20147.
051 */
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
058
059  @Before
060  public void setUp() throws IOException, StreamLacksCapabilityException {
061    setupWALWriter();
062  }
063
064  // make sure that we will start replication for the sequence id after move, that's what we want to
065  // test here.
066  private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
067    moveRegion(region, rs);
068    rollAllWALs();
069  }
070
071  private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName)
072    throws Exception {
073    Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
074    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
075    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
076
077      @Override
078      public boolean evaluate() throws Exception {
079        ReplicationSourceManager manager =
080          ((Replication) rs.getReplicationSourceService()).getReplicationManager();
081        // Make sure replication moves to the new file.
082        return (manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1)
083          && !oldWalName.equals(manager.getWALs().get(PEER_ID).get(logPrefix).first());
084      }
085
086      @Override
087      public String explainFailure() throws Exception {
088        return "Still not replicated to the current WAL file yet";
089      }
090    });
091  }
092
093  @Test
094  public void testAddPeer() throws Exception {
095    TableName tableName = createTable();
096    try (Table table = UTIL.getConnection().getTable(tableName)) {
097      for (int i = 0; i < 100; i++) {
098        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
099      }
100    }
101    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
102    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
103    moveRegionAndArchiveOldWals(region, rs);
104    addPeer(true);
105    try (Table table = UTIL.getConnection().getTable(tableName)) {
106      for (int i = 0; i < 100; i++) {
107        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
108      }
109    }
110    waitUntilReplicationDone(100);
111    checkOrder(100);
112  }
113
114  @Test
115  public void testChangeToSerial() throws Exception {
116    ReplicationPeerConfig peerConfig =
117      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
118        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
119    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
120
121    TableName tableName = createTable();
122    try (Table table = UTIL.getConnection().getTable(tableName)) {
123      for (int i = 0; i < 100; i++) {
124        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
125      }
126    }
127
128    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
129    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
130    // Get the current wal file name
131    String walFileNameBeforeRollover =
132      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
133
134    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
135    moveRegionAndArchiveOldWals(region, rs);
136    waitUntilReplicationDone(100);
137    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
138
139    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
140    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
141      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
142    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
143
144    try (Table table = UTIL.getConnection().getTable(tableName)) {
145      for (int i = 0; i < 100; i++) {
146        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
147      }
148    }
149    waitUntilReplicationDone(200);
150    checkOrder(200);
151  }
152
153  @Test
154  public void testAddToSerialPeer() throws Exception {
155    ReplicationPeerConfig peerConfig =
156      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
157        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
158        .setReplicateAllUserTables(false).setSerial(true).build();
159    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
160
161    TableName tableName = createTable();
162    try (Table table = UTIL.getConnection().getTable(tableName)) {
163      for (int i = 0; i < 100; i++) {
164        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
165      }
166    }
167    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
168    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
169    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
170
171    // Get the current wal file name
172    String walFileNameBeforeRollover =
173      ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName();
174
175    moveRegionAndArchiveOldWals(region, rs);
176
177    // Make sure that the replication done for the oldWal at source rs.
178    waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover);
179
180    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
181    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
182      ReplicationPeerConfig.newBuilder(peerConfig)
183        .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
184    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
185    try (Table table = UTIL.getConnection().getTable(tableName)) {
186      for (int i = 0; i < 100; i++) {
187        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
188      }
189    }
190    waitUntilReplicationDone(100);
191    checkOrder(100);
192  }
193
194  @Test
195  public void testDisabledTable() throws Exception {
196    TableName tableName = createTable();
197    try (Table table = UTIL.getConnection().getTable(tableName)) {
198      for (int i = 0; i < 100; i++) {
199        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
200      }
201    }
202    UTIL.getAdmin().disableTable(tableName);
203    rollAllWALs();
204    addPeer(true);
205    UTIL.getAdmin().enableTable(tableName);
206    try (Table table = UTIL.getConnection().getTable(tableName)) {
207      for (int i = 0; i < 100; i++) {
208        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
209      }
210    }
211    waitUntilReplicationDone(100);
212    checkOrder(100);
213  }
214
215  @Test
216  public void testDisablingTable() throws Exception {
217    TableName tableName = createTable();
218    try (Table table = UTIL.getConnection().getTable(tableName)) {
219      for (int i = 0; i < 100; i++) {
220        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
221      }
222    }
223    UTIL.getAdmin().disableTable(tableName);
224    rollAllWALs();
225    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
226    tsm.setTableState(tableName, TableState.State.DISABLING);
227    Thread t = new Thread(() -> {
228      try {
229        addPeer(true);
230      } catch (IOException e) {
231        throw new RuntimeException(e);
232      }
233    });
234    t.start();
235    Thread.sleep(5000);
236    // we will wait on the disabling table so the thread should still be alive.
237    assertTrue(t.isAlive());
238    tsm.setTableState(tableName, TableState.State.DISABLED);
239    t.join();
240    UTIL.getAdmin().enableTable(tableName);
241    try (Table table = UTIL.getConnection().getTable(tableName)) {
242      for (int i = 0; i < 100; i++) {
243        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
244      }
245    }
246    waitUntilReplicationDone(100);
247    checkOrder(100);
248  }
249
250  @Test
251  public void testEnablingTable() throws Exception {
252    TableName tableName = createTable();
253    try (Table table = UTIL.getConnection().getTable(tableName)) {
254      for (int i = 0; i < 100; i++) {
255        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
256      }
257    }
258    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
259    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
260    moveRegionAndArchiveOldWals(region, rs);
261    TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager();
262    tsm.setTableState(tableName, TableState.State.ENABLING);
263    Thread t = new Thread(() -> {
264      try {
265        addPeer(true);
266      } catch (IOException e) {
267        throw new RuntimeException(e);
268      }
269    });
270    t.start();
271    Thread.sleep(5000);
272    // we will wait on the disabling table so the thread should still be alive.
273    assertTrue(t.isAlive());
274    tsm.setTableState(tableName, TableState.State.ENABLED);
275    t.join();
276    try (Table table = UTIL.getConnection().getTable(tableName)) {
277      for (int i = 0; i < 100; i++) {
278        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
279      }
280    }
281    waitUntilReplicationDone(100);
282    checkOrder(100);
283  }
284}