001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025
026import java.io.File;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellBuilder;
036import org.apache.hadoop.hbase.CellBuilderFactory;
037import org.apache.hadoop.hbase.CellBuilderType;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.NamespaceDescriptor;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.io.hfile.HFile;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
056import org.apache.hadoop.hbase.replication.TestReplicationBase;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.testclassification.SmallTests;
059import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Threads;
062import org.apache.hadoop.hdfs.MiniDFSCluster;
063import org.junit.After;
064import org.junit.Assert;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TemporaryFolder;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
074import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
075
076@Category({ ReplicationTests.class, SmallTests.class })
077public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class);
082
083  private static final String PEER1_CLUSTER_ID = "peer1";
084  private static final String PEER2_CLUSTER_ID = "peer2";
085
086  private static final String REPLICATE_NAMESPACE = "replicate_ns";
087  private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
088  private static final TableName REPLICATE_TABLE =
089    TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
090  private static final TableName NO_REPLICATE_TABLE =
091    TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
092  private static final byte[] CF_A = Bytes.toBytes("cfa");
093  private static final byte[] CF_B = Bytes.toBytes("cfb");
094
095  private byte[] row = Bytes.toBytes("r1");
096  private byte[] qualifier = Bytes.toBytes("q1");
097  private byte[] value = Bytes.toBytes("v1");
098
099  @ClassRule
100  public static TemporaryFolder testFolder = new TemporaryFolder();
101
102  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
103
104  private static Admin admin1;
105  private static Admin admin2;
106
107  private static ReplicationQueueStorage queueStorage;
108
109  @BeforeClass
110  public static void setUpBeforeClass() throws Exception {
111    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
112    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
113    TestReplicationBase.setUpBeforeClass();
114    admin1 = UTIL1.getConnection().getAdmin();
115    admin2 = UTIL2.getConnection().getAdmin();
116
117    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
118      UTIL1.getConfiguration());
119
120    admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
121    admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
122    admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
123    admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
124  }
125
126  protected static void setupBulkLoadConfigsForCluster(Configuration config,
127    String clusterReplicationId) throws Exception {
128    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
129    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
130    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
131    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
132    config.writeXml(new FileOutputStream(sourceConfigFile));
133    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
134  }
135
136  @Before
137  public void setUp() throws Exception {
138    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
139      admin1.removeReplicationPeer(peer.getPeerId());
140    }
141  }
142
143  @After
144  public void teardown() throws Exception {
145    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
146      admin1.removeReplicationPeer(peer.getPeerId());
147    }
148    for (TableName tableName : admin1.listTableNames()) {
149      UTIL1.deleteTable(tableName);
150    }
151    for (TableName tableName : admin2.listTableNames()) {
152      UTIL2.deleteTable(tableName);
153    }
154  }
155
156  @Test
157  public void testWhenExcludeCF() throws Exception {
158    // Create table in source and remote clusters.
159    createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
160    // Add peer, setReplicateAllUserTables true, but exclude CF_B.
161    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
162    excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
163    ReplicationPeerConfig peerConfig =
164      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
165        .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build();
166    admin1.addReplicationPeer(PEER_ID2, peerConfig);
167    Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
168    Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
169    Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
170
171    assertEquals(0, queueStorage.getAllHFileRefs().size());
172
173    // Bulk load data into the CF that is not replicated.
174    bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
175    Threads.sleep(1000);
176
177    // Cannot get data from remote cluster
178    Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
179    Result result = table2.get(new Get(row));
180    assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
181    // The extra HFile is never added to the HFileRefs
182    assertEquals(0, queueStorage.getAllHFileRefs().size());
183  }
184
185  @Test
186  public void testWhenExcludeTable() throws Exception {
187    // Create 2 tables in source and remote clusters.
188    createTableOnClusters(REPLICATE_TABLE, CF_A);
189    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
190
191    // Add peer, setReplicateAllUserTables true, but exclude one table.
192    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
193    excludeTableCFs.put(NO_REPLICATE_TABLE, null);
194    ReplicationPeerConfig peerConfig =
195      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
196        .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build();
197    admin1.addReplicationPeer(PEER_ID2, peerConfig);
198    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
199    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
200    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
201    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
202
203    assertEquals(0, queueStorage.getAllHFileRefs().size());
204
205    // Bulk load data into the table that is not replicated.
206    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
207    Threads.sleep(1000);
208
209    // Cannot get data from remote cluster
210    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
211    Result result = table2.get(new Get(row));
212    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
213
214    // The extra HFile is never added to the HFileRefs
215    assertEquals(0, queueStorage.getAllHFileRefs().size());
216  }
217
218  @Test
219  public void testWhenExcludeNamespace() throws Exception {
220    // Create 2 tables in source and remote clusters.
221    createTableOnClusters(REPLICATE_TABLE, CF_A);
222    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
223
224    // Add peer, setReplicateAllUserTables true, but exclude one namespace.
225    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
226      .setClusterKey(UTIL2.getClusterKey()).setReplicateAllUserTables(true)
227      .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)).build();
228    admin1.addReplicationPeer(PEER_ID2, peerConfig);
229    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
230    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
231    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
232    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
233
234    assertEquals(0, queueStorage.getAllHFileRefs().size());
235
236    // Bulk load data into the table of the namespace that is not replicated.
237    byte[] row = Bytes.toBytes("001");
238    byte[] value = Bytes.toBytes("v1");
239    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
240    Threads.sleep(1000);
241
242    // Cannot get data from remote cluster
243    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
244    Result result = table2.get(new Get(row));
245    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
246
247    // The extra HFile is never added to the HFileRefs
248    assertEquals(0, queueStorage.getAllHFileRefs().size());
249  }
250
251  protected void bulkLoadOnCluster(TableName tableName, byte[] family) throws Exception {
252    String bulkLoadFilePath = createHFileForFamilies(family);
253    copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
254    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
255    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
256  }
257
258  private String createHFileForFamilies(byte[] family) throws IOException {
259    CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
260    cellBuilder.setRow(row).setFamily(family).setQualifier(qualifier).setValue(value)
261      .setType(Cell.Type.Put);
262
263    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
264    File hFileLocation = testFolder.newFile();
265    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
266    try {
267      hFileFactory.withOutputStream(out);
268      hFileFactory.withFileContext(new HFileContextBuilder().build());
269      HFile.Writer writer = hFileFactory.create();
270      try {
271        writer.append(new KeyValue(cellBuilder.build()));
272      } finally {
273        writer.close();
274      }
275    } finally {
276      out.close();
277    }
278    return hFileLocation.getAbsoluteFile().getAbsolutePath();
279  }
280
281  private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
282    throws Exception {
283    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
284    cluster.getFileSystem().mkdirs(bulkLoadDir);
285    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
286  }
287
288  private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
289    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
290    for (byte[] cf : cfs) {
291      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
292        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
293    }
294    TableDescriptor td = builder.build();
295    admin1.createTable(td);
296    admin2.createTable(td);
297  }
298}