001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 045import org.apache.hadoop.hbase.wal.WAL.Entry; 046import org.apache.hadoop.hbase.wal.WALFactory; 047import org.apache.hadoop.hbase.wal.WALProvider; 048import org.apache.hadoop.hbase.wal.WALStreamReader; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055/** 056 * Testcase for HBASE-20624. 057 */ 058@Category({ ReplicationTests.class, MediumTests.class }) 059public class TestRaceWhenCreatingReplicationSource { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestRaceWhenCreatingReplicationSource.class); 064 065 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 066 067 private static String PEER_ID = "1"; 068 069 private static TableName TABLE_NAME = TableName.valueOf("race"); 070 071 private static byte[] CF = Bytes.toBytes("CF"); 072 073 private static byte[] CQ = Bytes.toBytes("CQ"); 074 075 private static FileSystem FS; 076 077 private static Path LOG_PATH; 078 079 private static WALProvider.Writer WRITER; 080 081 private static volatile boolean NULL_UUID = true; 082 083 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 084 085 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 086 087 @Override 088 public UUID getPeerUUID() { 089 if (NULL_UUID) { 090 return null; 091 } else { 092 return PEER_UUID; 093 } 094 } 095 096 @Override 097 public boolean replicate(ReplicateContext replicateContext) { 098 synchronized (WRITER) { 099 try { 100 for (Entry entry : replicateContext.getEntries()) { 101 WRITER.append(entry); 102 } 103 WRITER.sync(false); 104 } catch (IOException e) { 105 throw new UncheckedIOException(e); 106 } 107 } 108 return true; 109 } 110 111 @Override 112 public void start() { 113 startAsync(); 114 } 115 116 @Override 117 public void stop() { 118 stopAsync(); 119 } 120 121 @Override 122 protected void doStart() { 123 notifyStarted(); 124 } 125 126 @Override 127 protected void doStop() { 128 notifyStopped(); 129 } 130 131 @Override 132 public boolean canReplicateToSameCluster() { 133 return true; 134 } 135 } 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal"); 140 // make sure that we will create a new group for the table 141 UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8); 142 UTIL.startMiniCluster(3); 143 Path dir = UTIL.getDataTestDirOnTestFS(); 144 FS = UTIL.getTestFileSystem(); 145 LOG_PATH = new Path(dir, "replicated"); 146 WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); 147 UTIL.getAdmin().addReplicationPeer(PEER_ID, 148 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 149 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), 150 true); 151 } 152 153 @AfterClass 154 public static void tearDownAfterClass() throws Exception { 155 UTIL.shutdownMiniCluster(); 156 } 157 158 @Test 159 public void testRace() throws Exception { 160 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 161 162 @Override 163 public boolean evaluate() throws Exception { 164 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 165 ReplicationSource source = 166 (ReplicationSource) ((Replication) t.getRegionServer().getReplicationSourceService()) 167 .getReplicationManager().getSource(PEER_ID); 168 if (source == null || source.getReplicationEndpoint() == null) { 169 return false; 170 } 171 } 172 return true; 173 } 174 175 @Override 176 public String explainFailure() throws Exception { 177 return "Replication source has not been initialized yet"; 178 } 179 }); 180 UTIL.getAdmin().createTable( 181 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 182 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 183 UTIL.waitTableAvailable(TABLE_NAME); 184 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 185 table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))); 186 } 187 NULL_UUID = false; 188 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 189 190 @Override 191 public boolean evaluate() throws Exception { 192 try (WALStreamReader reader = 193 WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) { 194 return reader.next() != null; 195 } catch (IOException e) { 196 return false; 197 } 198 } 199 200 @Override 201 public String explainFailure() throws Exception { 202 return "Replication has not catched up"; 203 } 204 }); 205 try (WALStreamReader reader = 206 WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) { 207 Cell cell = reader.next().getEdit().getCells().get(0); 208 assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 209 assertArrayEquals(CF, CellUtil.cloneFamily(cell)); 210 assertArrayEquals(CQ, CellUtil.cloneQualifier(cell)); 211 assertEquals(1, 212 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 213 } 214 } 215}