001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; 021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025 026import java.io.File; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellBuilder; 036import org.apache.hadoop.hbase.CellBuilderFactory; 037import org.apache.hadoop.hbase.CellBuilderType; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.NamespaceDescriptor; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.io.hfile.HFile; 051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 056import org.apache.hadoop.hbase.replication.TestReplicationBase; 057import org.apache.hadoop.hbase.testclassification.ReplicationTests; 058import org.apache.hadoop.hbase.testclassification.SmallTests; 059import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.Threads; 062import org.apache.hadoop.hdfs.MiniDFSCluster; 063import org.junit.After; 064import org.junit.Assert; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TemporaryFolder; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 074import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 075 076@Category({ ReplicationTests.class, SmallTests.class }) 077public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class); 082 083 private static final String PEER1_CLUSTER_ID = "peer1"; 084 private static final String PEER2_CLUSTER_ID = "peer2"; 085 086 private static final String REPLICATE_NAMESPACE = "replicate_ns"; 087 private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns"; 088 private static final TableName REPLICATE_TABLE = 089 TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table"); 090 private static final TableName NO_REPLICATE_TABLE = 091 TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table"); 092 private static final byte[] CF_A = Bytes.toBytes("cfa"); 093 private static final byte[] CF_B = Bytes.toBytes("cfb"); 094 095 private byte[] row = Bytes.toBytes("r1"); 096 private byte[] qualifier = Bytes.toBytes("q1"); 097 private byte[] value = Bytes.toBytes("v1"); 098 099 @ClassRule 100 public static TemporaryFolder testFolder = new TemporaryFolder(); 101 102 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir"); 103 104 private static Admin admin1; 105 private static Admin admin2; 106 107 private static ReplicationQueueStorage queueStorage; 108 109 @BeforeClass 110 public static void setUpBeforeClass() throws Exception { 111 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); 112 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); 113 TestReplicationBase.setUpBeforeClass(); 114 admin1 = UTIL1.getConnection().getAdmin(); 115 admin2 = UTIL2.getConnection().getAdmin(); 116 117 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(), 118 UTIL1.getConfiguration()); 119 120 admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); 121 admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build()); 122 admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); 123 admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build()); 124 } 125 126 protected static void setupBulkLoadConfigsForCluster(Configuration config, 127 String clusterReplicationId) throws Exception { 128 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 129 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); 130 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); 131 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml"); 132 config.writeXml(new FileOutputStream(sourceConfigFile)); 133 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); 134 } 135 136 @Before 137 public void setUp() throws Exception { 138 for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { 139 admin1.removeReplicationPeer(peer.getPeerId()); 140 } 141 } 142 143 @After 144 public void teardown() throws Exception { 145 for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) { 146 admin1.removeReplicationPeer(peer.getPeerId()); 147 } 148 for (TableName tableName : admin1.listTableNames()) { 149 UTIL1.deleteTable(tableName); 150 } 151 for (TableName tableName : admin2.listTableNames()) { 152 UTIL2.deleteTable(tableName); 153 } 154 } 155 156 @Test 157 public void testWhenExcludeCF() throws Exception { 158 // Create table in source and remote clusters. 159 createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B); 160 // Add peer, setReplicateAllUserTables true, but exclude CF_B. 161 Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap(); 162 excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B))); 163 ReplicationPeerConfig peerConfig = 164 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) 165 .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); 166 admin1.addReplicationPeer(PEER_ID2, peerConfig); 167 Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 168 Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 169 Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B)); 170 171 assertEquals(0, queueStorage.getAllHFileRefs().size()); 172 173 // Bulk load data into the CF that is not replicated. 174 bulkLoadOnCluster(REPLICATE_TABLE, CF_B); 175 Threads.sleep(1000); 176 177 // Cannot get data from remote cluster 178 Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE); 179 Result result = table2.get(new Get(row)); 180 assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier))); 181 // The extra HFile is never added to the HFileRefs 182 assertEquals(0, queueStorage.getAllHFileRefs().size()); 183 } 184 185 @Test 186 public void testWhenExcludeTable() throws Exception { 187 // Create 2 tables in source and remote clusters. 188 createTableOnClusters(REPLICATE_TABLE, CF_A); 189 createTableOnClusters(NO_REPLICATE_TABLE, CF_A); 190 191 // Add peer, setReplicateAllUserTables true, but exclude one table. 192 Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap(); 193 excludeTableCFs.put(NO_REPLICATE_TABLE, null); 194 ReplicationPeerConfig peerConfig = 195 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) 196 .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build(); 197 admin1.addReplicationPeer(PEER_ID2, peerConfig); 198 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 199 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); 200 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 201 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); 202 203 assertEquals(0, queueStorage.getAllHFileRefs().size()); 204 205 // Bulk load data into the table that is not replicated. 206 bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); 207 Threads.sleep(1000); 208 209 // Cannot get data from remote cluster 210 Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); 211 Result result = table2.get(new Get(row)); 212 assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); 213 214 // The extra HFile is never added to the HFileRefs 215 assertEquals(0, queueStorage.getAllHFileRefs().size()); 216 } 217 218 @Test 219 public void testWhenExcludeNamespace() throws Exception { 220 // Create 2 tables in source and remote clusters. 221 createTableOnClusters(REPLICATE_TABLE, CF_A); 222 createTableOnClusters(NO_REPLICATE_TABLE, CF_A); 223 224 // Add peer, setReplicateAllUserTables true, but exclude one namespace. 225 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 226 .setClusterKey(UTIL2.getClusterKey()).setReplicateAllUserTables(true) 227 .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)).build(); 228 admin1.addReplicationPeer(PEER_ID2, peerConfig); 229 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE)); 230 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE)); 231 assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A)); 232 assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A)); 233 234 assertEquals(0, queueStorage.getAllHFileRefs().size()); 235 236 // Bulk load data into the table of the namespace that is not replicated. 237 byte[] row = Bytes.toBytes("001"); 238 byte[] value = Bytes.toBytes("v1"); 239 bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A); 240 Threads.sleep(1000); 241 242 // Cannot get data from remote cluster 243 Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE); 244 Result result = table2.get(new Get(row)); 245 assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier))); 246 247 // The extra HFile is never added to the HFileRefs 248 assertEquals(0, queueStorage.getAllHFileRefs().size()); 249 } 250 251 protected void bulkLoadOnCluster(TableName tableName, byte[] family) throws Exception { 252 String bulkLoadFilePath = createHFileForFamilies(family); 253 copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster()); 254 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration()); 255 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR); 256 } 257 258 private String createHFileForFamilies(byte[] family) throws IOException { 259 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 260 cellBuilder.setRow(row).setFamily(family).setQualifier(qualifier).setValue(value) 261 .setType(Cell.Type.Put); 262 263 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration()); 264 File hFileLocation = testFolder.newFile(); 265 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); 266 try { 267 hFileFactory.withOutputStream(out); 268 hFileFactory.withFileContext(new HFileContextBuilder().build()); 269 HFile.Writer writer = hFileFactory.create(); 270 try { 271 writer.append(new KeyValue(cellBuilder.build())); 272 } finally { 273 writer.close(); 274 } 275 } finally { 276 out.close(); 277 } 278 return hFileLocation.getAbsoluteFile().getAbsolutePath(); 279 } 280 281 private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster) 282 throws Exception { 283 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family)); 284 cluster.getFileSystem().mkdirs(bulkLoadDir); 285 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); 286 } 287 288 private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException { 289 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 290 for (byte[] cf : cfs) { 291 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf) 292 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 293 } 294 TableDescriptor td = builder.build(); 295 admin1.createTable(td); 296 admin2.createTable(td); 297 } 298}