001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 044import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 045import org.apache.hadoop.hbase.wal.WAL.Entry; 046import org.apache.hadoop.hbase.wal.WALStreamReader; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ ReplicationTests.class, MediumTests.class }) 053public class TestSerialReplication extends SerialReplicationTestBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestSerialReplication.class); 058 059 @Before 060 public void setUp() throws IOException, StreamLacksCapabilityException { 061 setupWALWriter(); 062 // add in disable state, so later when enabling it all sources will start push together. 063 addPeer(false); 064 } 065 066 @Test 067 public void testRegionMove() throws Exception { 068 TableName tableName = createTable(); 069 try (Table table = UTIL.getConnection().getTable(tableName)) { 070 for (int i = 0; i < 100; i++) { 071 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 072 } 073 } 074 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 075 HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); 076 moveRegion(region, rs); 077 try (Table table = UTIL.getConnection().getTable(tableName)) { 078 for (int i = 100; i < 200; i++) { 079 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 080 } 081 } 082 enablePeerAndWaitUntilReplicationDone(200); 083 checkOrder(200); 084 } 085 086 @Test 087 public void testRegionSplit() throws Exception { 088 TableName tableName = createTable(); 089 try (Table table = UTIL.getConnection().getTable(tableName)) { 090 for (int i = 0; i < 100; i++) { 091 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 092 } 093 } 094 UTIL.flush(tableName); 095 RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); 096 UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30, 097 TimeUnit.SECONDS); 098 UTIL.waitUntilNoRegionsInTransition(30000); 099 List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName); 100 assertEquals(2, regions.size()); 101 try (Table table = UTIL.getConnection().getTable(tableName)) { 102 for (int i = 0; i < 100; i++) { 103 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 104 } 105 } 106 enablePeerAndWaitUntilReplicationDone(200); 107 Map<String, Long> regionsToSeqId = new HashMap<>(); 108 regionsToSeqId.put(region.getEncodedName(), -1L); 109 regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); 110 try (WALStreamReader reader = 111 NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 112 int count = 0; 113 for (Entry entry;;) { 114 entry = reader.next(); 115 if (entry == null) { 116 break; 117 } 118 String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName()); 119 Long seqId = regionsToSeqId.get(encodedName); 120 assertNotNull( 121 "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId); 122 assertTrue("Sequence id go backwards from " + seqId + " to " 123 + entry.getKey().getSequenceId() + " for " + encodedName, 124 entry.getKey().getSequenceId() >= seqId.longValue()); 125 if (count < 100) { 126 assertEquals(encodedName + " is pushed before parent " + region.getEncodedName(), 127 region.getEncodedName(), encodedName); 128 } else { 129 assertNotEquals(region.getEncodedName(), encodedName); 130 } 131 count++; 132 } 133 assertEquals(200, count); 134 } 135 } 136 137 @Test 138 public void testRegionMerge() throws Exception { 139 byte[] splitKey = Bytes.toBytes(50); 140 TableName tableName = TableName.valueOf(name.getMethodName()); 141 UTIL.getAdmin().createTable( 142 TableDescriptorBuilder.newBuilder(tableName) 143 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF) 144 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 145 .build(), 146 new byte[][] { splitKey }); 147 UTIL.waitTableAvailable(tableName); 148 try (Table table = UTIL.getConnection().getTable(tableName)) { 149 for (int i = 0; i < 100; i++) { 150 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 151 } 152 } 153 List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName); 154 UTIL.getAdmin() 155 .mergeRegionsAsync( 156 regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false) 157 .get(30, TimeUnit.SECONDS); 158 UTIL.waitUntilNoRegionsInTransition(30000); 159 List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName); 160 assertEquals(1, regionsAfterMerge.size()); 161 try (Table table = UTIL.getConnection().getTable(tableName)) { 162 for (int i = 0; i < 100; i++) { 163 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 164 } 165 } 166 enablePeerAndWaitUntilReplicationDone(200); 167 Map<String, Long> regionsToSeqId = new HashMap<>(); 168 RegionInfo region = regionsAfterMerge.get(0); 169 regionsToSeqId.put(region.getEncodedName(), -1L); 170 regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); 171 try (WALStreamReader reader = 172 NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 173 int count = 0; 174 for (Entry entry;;) { 175 entry = reader.next(); 176 if (entry == null) { 177 break; 178 } 179 String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName()); 180 Long seqId = regionsToSeqId.get(encodedName); 181 assertNotNull( 182 "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId); 183 assertTrue("Sequence id go backwards from " + seqId + " to " 184 + entry.getKey().getSequenceId() + " for " + encodedName, 185 entry.getKey().getSequenceId() >= seqId.longValue()); 186 if (count < 100) { 187 assertNotEquals( 188 encodedName + " is pushed before parents " + regions.stream() 189 .map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")), 190 region.getEncodedName(), encodedName); 191 } else { 192 assertEquals(region.getEncodedName(), encodedName); 193 } 194 count++; 195 } 196 assertEquals(200, count); 197 } 198 } 199 200 @Test 201 public void testRemovePeerNothingReplicated() throws Exception { 202 TableName tableName = createTable(); 203 String encodedRegionName = 204 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 205 ReplicationQueueStorage queueStorage = 206 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 207 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 208 UTIL.getAdmin().removeReplicationPeer(PEER_ID); 209 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 210 } 211 212 @Test 213 public void testRemovePeer() throws Exception { 214 TableName tableName = createTable(); 215 try (Table table = UTIL.getConnection().getTable(tableName)) { 216 for (int i = 0; i < 100; i++) { 217 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 218 } 219 } 220 enablePeerAndWaitUntilReplicationDone(100); 221 checkOrder(100); 222 String encodedRegionName = 223 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 224 ReplicationQueueStorage queueStorage = 225 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 226 assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); 227 UTIL.getAdmin().removeReplicationPeer(PEER_ID); 228 // confirm that we delete the last pushed sequence id 229 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 230 } 231 232 @Test 233 public void testRemoveSerialFlag() throws Exception { 234 TableName tableName = createTable(); 235 try (Table table = UTIL.getConnection().getTable(tableName)) { 236 for (int i = 0; i < 100; i++) { 237 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 238 } 239 } 240 enablePeerAndWaitUntilReplicationDone(100); 241 checkOrder(100); 242 String encodedRegionName = 243 UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); 244 ReplicationQueueStorage queueStorage = 245 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 246 assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); 247 ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID); 248 UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, 249 ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build()); 250 // confirm that we delete the last pushed sequence id 251 assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); 252 } 253}