001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.Collections; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 035import org.apache.hadoop.hbase.replication.regionserver.Replication; 036import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 041import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 048 049/** 050 * Testcase for HBASE-20147. 051 */ 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); 058 059 @Before 060 public void setUp() throws IOException, StreamLacksCapabilityException { 061 setupWALWriter(); 062 } 063 064 // make sure that we will start replication for the sequence id after move, that's what we want to 065 // test here. 066 private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { 067 moveRegion(region, rs); 068 rollAllWALs(); 069 } 070 071 private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs, final String oldWalName) 072 throws Exception { 073 Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName(); 074 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()); 075 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 076 077 @Override 078 public boolean evaluate() throws Exception { 079 ReplicationSourceManager manager = 080 ((Replication) rs.getReplicationSourceService()).getReplicationManager(); 081 // Make sure replication moves to the new file. 082 ReplicationQueueId queueId = new ReplicationQueueId(rs.getServerName(), PEER_ID); 083 return (manager.getWALs().get(queueId).get(logPrefix).size() == 1) 084 && !oldWalName.equals(manager.getWALs().get(queueId).get(logPrefix).first()); 085 } 086 087 @Override 088 public String explainFailure() throws Exception { 089 return "Still not replicated to the current WAL file yet"; 090 } 091 }); 092 } 093 094 @Test 095 public void testAddPeer() throws Exception { 096 TableName tableName = createTable(); 097 try (Table table = UTIL.getConnection().getTable(tableName)) { 098 for (int i = 0; i < 100; i++) { 099 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 100 } 101 } 102 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 103 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 104 moveRegionAndArchiveOldWals(region, rs); 105 addPeer(true); 106 try (Table table = UTIL.getConnection().getTable(tableName)) { 107 for (int i = 0; i < 100; i++) { 108 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 109 } 110 } 111 waitUntilReplicationDone(100); 112 checkOrder(100); 113 } 114 115 @Test 116 public void testChangeToSerial() throws Exception { 117 ReplicationPeerConfig peerConfig = 118 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 119 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); 120 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 121 122 TableName tableName = createTable(); 123 try (Table table = UTIL.getConnection().getTable(tableName)) { 124 for (int i = 0; i < 100; i++) { 125 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 126 } 127 } 128 129 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 130 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 131 // Get the current wal file name 132 String walFileNameBeforeRollover = 133 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 134 135 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 136 moveRegionAndArchiveOldWals(region, rs); 137 waitUntilReplicationDone(100); 138 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 139 140 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 141 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 142 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); 143 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 144 145 try (Table table = UTIL.getConnection().getTable(tableName)) { 146 for (int i = 0; i < 100; i++) { 147 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 148 } 149 } 150 waitUntilReplicationDone(200); 151 checkOrder(200); 152 } 153 154 @Test 155 public void testAddToSerialPeer() throws Exception { 156 ReplicationPeerConfig peerConfig = 157 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 158 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) 159 .setReplicateAllUserTables(false).setSerial(true).build(); 160 UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); 161 162 TableName tableName = createTable(); 163 try (Table table = UTIL.getConnection().getTable(tableName)) { 164 for (int i = 0; i < 100; i++) { 165 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 166 } 167 } 168 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 169 HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); 170 HRegionServer rs = UTIL.getOtherRegionServer(srcRs); 171 172 // Get the current wal file name 173 String walFileNameBeforeRollover = 174 ((AbstractFSWAL<?>) srcRs.getWAL(null)).getCurrentFileName().getName(); 175 176 moveRegionAndArchiveOldWals(region, rs); 177 178 // Make sure that the replication done for the oldWal at source rs. 179 waitUntilReplicatedToTheCurrentWALFile(srcRs, walFileNameBeforeRollover); 180 181 UTIL.getAdmin().disableReplicationPeer(PEER_ID); 182 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 183 ReplicationPeerConfig.newBuilder(peerConfig) 184 .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build()); 185 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 186 try (Table table = UTIL.getConnection().getTable(tableName)) { 187 for (int i = 0; i < 100; i++) { 188 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 189 } 190 } 191 waitUntilReplicationDone(100); 192 checkOrder(100); 193 } 194 195 @Test 196 public void testDisabledTable() throws Exception { 197 TableName tableName = createTable(); 198 try (Table table = UTIL.getConnection().getTable(tableName)) { 199 for (int i = 0; i < 100; i++) { 200 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 201 } 202 } 203 UTIL.getAdmin().disableTable(tableName); 204 rollAllWALs(); 205 addPeer(true); 206 UTIL.getAdmin().enableTable(tableName); 207 try (Table table = UTIL.getConnection().getTable(tableName)) { 208 for (int i = 0; i < 100; i++) { 209 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 210 } 211 } 212 waitUntilReplicationDone(100); 213 checkOrder(100); 214 } 215 216 @Test 217 public void testDisablingTable() throws Exception { 218 TableName tableName = createTable(); 219 try (Table table = UTIL.getConnection().getTable(tableName)) { 220 for (int i = 0; i < 100; i++) { 221 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 222 } 223 } 224 UTIL.getAdmin().disableTable(tableName); 225 rollAllWALs(); 226 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 227 tsm.setTableState(tableName, TableState.State.DISABLING); 228 Thread t = new Thread(() -> { 229 try { 230 addPeer(true); 231 } catch (IOException e) { 232 throw new RuntimeException(e); 233 } 234 }); 235 t.start(); 236 Thread.sleep(5000); 237 // we will wait on the disabling table so the thread should still be alive. 238 assertTrue(t.isAlive()); 239 tsm.setTableState(tableName, TableState.State.DISABLED); 240 t.join(); 241 UTIL.getAdmin().enableTable(tableName); 242 try (Table table = UTIL.getConnection().getTable(tableName)) { 243 for (int i = 0; i < 100; i++) { 244 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 245 } 246 } 247 waitUntilReplicationDone(100); 248 checkOrder(100); 249 } 250 251 @Test 252 public void testEnablingTable() throws Exception { 253 TableName tableName = createTable(); 254 try (Table table = UTIL.getConnection().getTable(tableName)) { 255 for (int i = 0; i < 100; i++) { 256 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 257 } 258 } 259 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 260 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 261 moveRegionAndArchiveOldWals(region, rs); 262 TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); 263 tsm.setTableState(tableName, TableState.State.ENABLING); 264 Thread t = new Thread(() -> { 265 try { 266 addPeer(true); 267 } catch (IOException e) { 268 throw new RuntimeException(e); 269 } 270 }); 271 t.start(); 272 Thread.sleep(5000); 273 // we will wait on the disabling table so the thread should still be alive. 274 assertTrue(t.isAlive()); 275 tsm.setTableState(tableName, TableState.State.ENABLED); 276 t.join(); 277 try (Table table = UTIL.getConnection().getTable(tableName)) { 278 for (int i = 0; i < 100; i++) { 279 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 280 } 281 } 282 waitUntilReplicationDone(100); 283 checkOrder(100); 284 } 285}