001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 040import org.apache.hadoop.hbase.wal.WAL.Entry; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALProvider; 043import org.apache.hadoop.hbase.wal.WALStreamReader; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.Rule; 048import org.junit.rules.TestName; 049 050/** 051 * Base class for testing serial replication. 052 */ 053public class SerialReplicationTestBase { 054 055 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 056 057 protected static String PEER_ID = "1"; 058 059 protected static byte[] CF = Bytes.toBytes("CF"); 060 061 protected static byte[] CQ = Bytes.toBytes("CQ"); 062 063 protected static FileSystem FS; 064 065 protected static Path LOG_DIR; 066 067 protected static WALProvider.Writer WRITER; 068 069 @Rule 070 public final TestName name = new TestName(); 071 072 protected Path logPath; 073 074 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 075 076 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 077 078 @Override 079 public UUID getPeerUUID() { 080 return PEER_UUID; 081 } 082 083 @Override 084 public boolean replicate(ReplicateContext replicateContext) { 085 synchronized (WRITER) { 086 try { 087 for (Entry entry : replicateContext.getEntries()) { 088 WRITER.append(entry); 089 } 090 WRITER.sync(false); 091 } catch (IOException e) { 092 throw new UncheckedIOException(e); 093 } 094 } 095 return true; 096 } 097 098 @Override 099 public void start() { 100 startAsync(); 101 } 102 103 @Override 104 public void stop() { 105 stopAsync(); 106 } 107 108 @Override 109 protected void doStart() { 110 notifyStarted(); 111 } 112 113 @Override 114 protected void doStop() { 115 notifyStopped(); 116 } 117 118 @Override 119 public boolean canReplicateToSameCluster() { 120 return true; 121 } 122 } 123 124 @BeforeClass 125 public static void setUpBeforeClass() throws Exception { 126 UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); 127 UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); 128 UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100); 129 UTIL.startMiniCluster(3); 130 // disable balancer 131 UTIL.getAdmin().balancerSwitch(false, true); 132 LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); 133 FS = UTIL.getTestFileSystem(); 134 FS.mkdirs(LOG_DIR); 135 } 136 137 @AfterClass 138 public static void tearDownAfterClass() throws Exception { 139 UTIL.shutdownMiniCluster(); 140 } 141 142 @After 143 public void tearDown() throws Exception { 144 Admin admin = UTIL.getAdmin(); 145 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 146 admin.removeReplicationPeer(pd.getPeerId()); 147 } 148 rollAllWALs(); 149 if (WRITER != null) { 150 WRITER.close(); 151 WRITER = null; 152 } 153 } 154 155 protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { 156 UTIL.getAdmin().move(region.getEncodedNameAsBytes(), rs.getServerName()); 157 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 158 159 @Override 160 public boolean evaluate() throws Exception { 161 return rs.getRegion(region.getEncodedName()) != null; 162 } 163 164 @Override 165 public String explainFailure() throws Exception { 166 return region + " is still not on " + rs; 167 } 168 }); 169 } 170 171 protected static void rollAllWALs() throws Exception { 172 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { 173 t.getRegionServer().getWalRoller().requestRollAll(); 174 } 175 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 176 177 @Override 178 public boolean evaluate() throws Exception { 179 return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() 180 .map(RegionServerThread::getRegionServer).allMatch(HRegionServer::walRollRequestFinished); 181 } 182 183 @Override 184 public String explainFailure() throws Exception { 185 return "Log roll has not finished yet"; 186 } 187 }); 188 } 189 190 protected final void setupWALWriter() throws IOException { 191 logPath = new Path(LOG_DIR, name.getMethodName()); 192 WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); 193 } 194 195 protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { 196 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 197 198 @Override 199 public boolean evaluate() throws Exception { 200 try { 201 return NoEOFWALStreamReader.count(FS, logPath, UTIL.getConfiguration()) 202 >= expectedEntries; 203 } catch (IOException e) { 204 return false; 205 } 206 } 207 208 @Override 209 public String explainFailure() throws Exception { 210 return "Not enough entries replicated"; 211 } 212 }); 213 } 214 215 protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { 216 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 217 waitUntilReplicationDone(expectedEntries); 218 } 219 220 protected final void addPeer(boolean enabled) throws IOException { 221 UTIL.getAdmin().addReplicationPeer(PEER_ID, 222 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 223 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) 224 .build(), 225 enabled); 226 } 227 228 protected final void checkOrder(int expectedEntries) throws IOException { 229 try (WALStreamReader reader = 230 NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 231 long seqId = -1L; 232 int count = 0; 233 for (Entry entry;;) { 234 entry = reader.next(); 235 if (entry == null) { 236 break; 237 } 238 assertTrue( 239 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), 240 entry.getKey().getSequenceId() >= seqId); 241 seqId = entry.getKey().getSequenceId(); 242 count++; 243 } 244 assertEquals(expectedEntries, count); 245 } 246 } 247 248 protected final TableName createTable() throws IOException, InterruptedException { 249 TableName tableName = TableName.valueOf(name.getMethodName()); 250 UTIL.getAdmin().createTable( 251 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 252 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 253 UTIL.waitTableAvailable(tableName); 254 return tableName; 255 } 256}