001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.File; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.net.UnknownHostException; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellBuilderType; 040import org.apache.hadoop.hbase.ExtendedCellBuilder; 041import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.coprocessor.ObserverContext; 057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 059import org.apache.hadoop.hbase.coprocessor.RegionObserver; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 062import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 063import org.apache.hadoop.hbase.replication.TestReplicationBase; 064import org.apache.hadoop.hbase.testclassification.MediumTests; 065import org.apache.hadoop.hbase.testclassification.ReplicationTests; 066import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.Pair; 069import org.apache.hadoop.hdfs.MiniDFSCluster; 070import org.junit.After; 071import org.junit.Before; 072import org.junit.BeforeClass; 073import org.junit.ClassRule; 074import org.junit.Rule; 075import org.junit.Test; 076import org.junit.experimental.categories.Category; 077import org.junit.rules.TemporaryFolder; 078import org.junit.rules.TestName; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082/** 083 * Integration test for bulk load replication. Defines three clusters, with the following 084 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2 085 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk 086 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given 087 * replication topology all these bulk loads should get replicated only once on each peer. To assert 088 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each 089 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying 090 * we are not entering the infinite loop condition addressed by HBASE-22380. 091 */ 092@Category({ ReplicationTests.class, MediumTests.class }) 093public class TestBulkLoadReplication extends TestReplicationBase { 094 095 @ClassRule 096 public static final HBaseClassTestRule CLASS_RULE = 097 HBaseClassTestRule.forClass(TestBulkLoadReplication.class); 098 099 protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class); 100 101 private static final String PEER1_CLUSTER_ID = "peer1"; 102 private static final String PEER2_CLUSTER_ID = "peer2"; 103 private static final String PEER3_CLUSTER_ID = "peer3"; 104 105 private static final String PEER_ID1 = "1"; 106 private static final String PEER_ID3 = "3"; 107 108 private static AtomicInteger BULK_LOADS_COUNT; 109 private static CountDownLatch BULK_LOAD_LATCH; 110 111 protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil(); 112 protected static final Configuration CONF3 = UTIL3.getConfiguration(); 113 114 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 115 116 private static Table htable3; 117 118 @Rule 119 public TestName name = new TestName(); 120 121 @ClassRule 122 public static TemporaryFolder testFolder = new TemporaryFolder(); 123 124 @BeforeClass 125 public static void setUpBeforeClass() throws Exception { 126 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 127 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 128 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); 129 setupConfig(UTIL3, "/3"); 130 TestReplicationBase.setUpBeforeClass(); 131 startThirdCluster(); 132 } 133 134 private static void startThirdCluster() throws Exception { 135 LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); 136 UTIL3.setZkCluster(UTIL1.getZkCluster()); 137 UTIL3.startMiniCluster(NUM_SLAVES1); 138 139 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 140 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true) 141 .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 142 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 143 144 Connection connection3 = ConnectionFactory.createConnection(CONF3); 145 try (Admin admin3 = connection3.getAdmin()) { 146 admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 147 } 148 UTIL3.waitUntilAllRegionsAssigned(tableName); 149 htable3 = connection3.getTable(tableName); 150 } 151 152 @Before 153 @Override 154 public void setUpBase() throws Exception { 155 // "super.setUpBase()" already sets replication from 1->2, 156 // then on the subsequent lines, sets 2->1, 2->3 and 3->2. 157 // So we have following topology: "1 <-> 2 <->3" 158 super.setUpBase(); 159 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); 160 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); 161 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); 162 // adds cluster1 as a remote peer on cluster2 163 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); 164 // adds cluster3 as a remote peer on cluster2 165 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); 166 // adds cluster2 as a remote peer on cluster3 167 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); 168 setupCoprocessor(UTIL1); 169 setupCoprocessor(UTIL2); 170 setupCoprocessor(UTIL3); 171 BULK_LOADS_COUNT = new AtomicInteger(0); 172 } 173 174 private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) 175 throws UnknownHostException { 176 return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI()) 177 .setSerial(isSerialPeer()).build(); 178 } 179 180 private void setupCoprocessor(HBaseTestingUtil cluster) { 181 cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { 182 try { 183 TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost() 184 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 185 if (cp == null) { 186 r.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, 187 cluster.getConfiguration()); 188 cp = r.getCoprocessorHost() 189 .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class); 190 cp.clusterName = cluster.getRpcConnnectionURI(); 191 } 192 } catch (Exception e) { 193 LOG.error(e.getMessage(), e); 194 } 195 }); 196 } 197 198 @After 199 @Override 200 public void tearDownBase() throws Exception { 201 super.tearDownBase(); 202 UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); 203 UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); 204 UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); 205 } 206 207 protected static void setupBulkLoadConfigsForCluster(Configuration config, 208 String clusterReplicationId) throws Exception { 209 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 210 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 211 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); 212 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml"); 213 config.writeXml(new FileOutputStream(sourceConfigFile)); 214 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); 215 } 216 217 @Test 218 public void testBulkLoadReplicationActiveActive() throws Exception { 219 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 220 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 221 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 222 byte[] row = Bytes.toBytes("001"); 223 byte[] value = Bytes.toBytes("v1"); 224 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 225 peer3TestTable); 226 row = Bytes.toBytes("002"); 227 value = Bytes.toBytes("v2"); 228 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable, 229 peer3TestTable); 230 row = Bytes.toBytes("003"); 231 value = Bytes.toBytes("v3"); 232 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable, 233 peer3TestTable); 234 // Additional wait to make sure no extra bulk load happens 235 Thread.sleep(400); 236 // We have 3 bulk load events (1 initiated on each cluster). 237 // Each event gets 3 counts (the originator cluster, plus the two peers), 238 // so BULK_LOADS_COUNT expected value is 3 * 3 = 9. 239 assertEquals(9, BULK_LOADS_COUNT.get()); 240 } 241 242 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value, 243 HBaseTestingUtil utility, Table... tables) throws Exception { 244 BULK_LOAD_LATCH = new CountDownLatch(3); 245 bulkLoadOnCluster(tableName, row, value, utility); 246 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); 247 assertTableHasValue(tables[0], row, value); 248 assertTableHasValue(tables[1], row, value); 249 assertTableHasValue(tables[2], row, value); 250 } 251 252 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value, 253 HBaseTestingUtil cluster) throws Exception { 254 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); 255 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); 256 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); 257 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 258 } 259 260 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { 261 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f"); 262 cluster.getFileSystem().mkdirs(bulkLoadDir); 263 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 264 } 265 266 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { 267 Get get = new Get(row); 268 Result result = table.get(get); 269 assertTrue(result.advance()); 270 assertEquals(Bytes.toString(value), Bytes.toString(result.value())); 271 } 272 273 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception { 274 Get get = new Get(row); 275 Result result = table.get(get); 276 assertTrue(result.isEmpty()); 277 } 278 279 private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) 280 throws IOException { 281 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 282 cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1")) 283 .setValue(value).setType(Cell.Type.Put); 284 285 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); 286 // TODO We need a way to do this without creating files 287 File hFileLocation = testFolder.newFile(); 288 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 289 try { 290 hFileFactory.withOutputStream(out); 291 hFileFactory.withFileContext(new HFileContextBuilder().build()); 292 HFile.Writer writer = hFileFactory.create(); 293 try { 294 writer.append(new KeyValue(cellBuilder.build())); 295 } finally { 296 writer.close(); 297 } 298 } finally { 299 out.close(); 300 } 301 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 302 } 303 304 public static class BulkReplicationTestObserver implements RegionCoprocessor { 305 306 String clusterName; 307 AtomicInteger bulkLoadCounts = new AtomicInteger(); 308 309 @Override 310 public Optional<RegionObserver> getRegionObserver() { 311 return Optional.of(new RegionObserver() { 312 313 @Override 314 public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 315 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) 316 throws IOException { 317 BULK_LOAD_LATCH.countDown(); 318 BULK_LOADS_COUNT.incrementAndGet(); 319 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName, 320 bulkLoadCounts.addAndGet(1)); 321 } 322 }); 323 } 324 } 325}