001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.net.UnknownHostException;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellBuilderType;
040import org.apache.hadoop.hbase.ExtendedCellBuilder;
041import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.ObserverContext;
057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
059import org.apache.hadoop.hbase.coprocessor.RegionObserver;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
062import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
063import org.apache.hadoop.hbase.replication.TestReplicationBase;
064import org.apache.hadoop.hbase.testclassification.MediumTests;
065import org.apache.hadoop.hbase.testclassification.ReplicationTests;
066import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.Pair;
069import org.apache.hadoop.hdfs.MiniDFSCluster;
070import org.junit.After;
071import org.junit.Before;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Rule;
075import org.junit.Test;
076import org.junit.experimental.categories.Category;
077import org.junit.rules.TemporaryFolder;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082/**
083 * Integration test for bulk load replication. Defines three clusters, with the following
084 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2
085 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk
086 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given
087 * replication topology all these bulk loads should get replicated only once on each peer. To assert
088 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each
089 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
090 * we are not entering the infinite loop condition addressed by HBASE-22380.
091 */
092@Category({ ReplicationTests.class, MediumTests.class })
093public class TestBulkLoadReplication extends TestReplicationBase {
094
095  @ClassRule
096  public static final HBaseClassTestRule CLASS_RULE =
097    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
098
099  protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
100
101  private static final String PEER1_CLUSTER_ID = "peer1";
102  private static final String PEER2_CLUSTER_ID = "peer2";
103  private static final String PEER3_CLUSTER_ID = "peer3";
104
105  private static final String PEER_ID1 = "1";
106  private static final String PEER_ID3 = "3";
107
108  private static AtomicInteger BULK_LOADS_COUNT;
109  private static CountDownLatch BULK_LOAD_LATCH;
110
111  protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil();
112  protected static final Configuration CONF3 = UTIL3.getConfiguration();
113
114  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
115
116  private static Table htable3;
117
118  @Rule
119  public TestName name = new TestName();
120
121  @ClassRule
122  public static TemporaryFolder testFolder = new TemporaryFolder();
123
124  @BeforeClass
125  public static void setUpBeforeClass() throws Exception {
126    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
127    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
128    setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
129    setupConfig(UTIL3, "/3");
130    TestReplicationBase.setUpBeforeClass();
131    startThirdCluster();
132  }
133
134  private static void startThirdCluster() throws Exception {
135    LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
136    UTIL3.setZkCluster(UTIL1.getZkCluster());
137    UTIL3.startMiniCluster(NUM_SLAVES1);
138
139    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
140      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true)
141        .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
142      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
143
144    Connection connection3 = ConnectionFactory.createConnection(CONF3);
145    try (Admin admin3 = connection3.getAdmin()) {
146      admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
147    }
148    UTIL3.waitUntilAllRegionsAssigned(tableName);
149    htable3 = connection3.getTable(tableName);
150  }
151
152  @Before
153  @Override
154  public void setUpBase() throws Exception {
155    // "super.setUpBase()" already sets replication from 1->2,
156    // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
157    // So we have following topology: "1 <-> 2 <->3"
158    super.setUpBase();
159    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
160    ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
161    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
162    // adds cluster1 as a remote peer on cluster2
163    UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
164    // adds cluster3 as a remote peer on cluster2
165    UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
166    // adds cluster2 as a remote peer on cluster3
167    UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
168    setupCoprocessor(UTIL1);
169    setupCoprocessor(UTIL2);
170    setupCoprocessor(UTIL3);
171    BULK_LOADS_COUNT = new AtomicInteger(0);
172  }
173
174  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util)
175    throws UnknownHostException {
176    return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI())
177      .setSerial(isSerialPeer()).build();
178  }
179
180  private void setupCoprocessor(HBaseTestingUtil cluster) {
181    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
182      try {
183        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
184          .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
185        if (cp == null) {
186          r.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
187            cluster.getConfiguration());
188          cp = r.getCoprocessorHost()
189            .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
190          cp.clusterName = cluster.getRpcConnnectionURI();
191        }
192      } catch (Exception e) {
193        LOG.error(e.getMessage(), e);
194      }
195    });
196  }
197
198  @After
199  @Override
200  public void tearDownBase() throws Exception {
201    super.tearDownBase();
202    UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
203    UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
204    UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
205  }
206
207  protected static void setupBulkLoadConfigsForCluster(Configuration config,
208    String clusterReplicationId) throws Exception {
209    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
210    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
211    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
212    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
213    config.writeXml(new FileOutputStream(sourceConfigFile));
214    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
215  }
216
217  @Test
218  public void testBulkLoadReplicationActiveActive() throws Exception {
219    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
220    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
221    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
222    byte[] row = Bytes.toBytes("001");
223    byte[] value = Bytes.toBytes("v1");
224    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable,
225      peer3TestTable);
226    row = Bytes.toBytes("002");
227    value = Bytes.toBytes("v2");
228    assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable,
229      peer3TestTable);
230    row = Bytes.toBytes("003");
231    value = Bytes.toBytes("v3");
232    assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable,
233      peer3TestTable);
234    // Additional wait to make sure no extra bulk load happens
235    Thread.sleep(400);
236    // We have 3 bulk load events (1 initiated on each cluster).
237    // Each event gets 3 counts (the originator cluster, plus the two peers),
238    // so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
239    assertEquals(9, BULK_LOADS_COUNT.get());
240  }
241
242  protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
243    HBaseTestingUtil utility, Table... tables) throws Exception {
244    BULK_LOAD_LATCH = new CountDownLatch(3);
245    bulkLoadOnCluster(tableName, row, value, utility);
246    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
247    assertTableHasValue(tables[0], row, value);
248    assertTableHasValue(tables[1], row, value);
249    assertTableHasValue(tables[2], row, value);
250  }
251
252  protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
253    HBaseTestingUtil cluster) throws Exception {
254    String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
255    copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
256    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
257    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
258  }
259
260  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
261    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
262    cluster.getFileSystem().mkdirs(bulkLoadDir);
263    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
264  }
265
266  protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
267    Get get = new Get(row);
268    Result result = table.get(get);
269    assertTrue(result.advance());
270    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
271  }
272
273  protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
274    Get get = new Get(row);
275    Result result = table.get(get);
276    assertTrue(result.isEmpty());
277  }
278
279  private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig)
280    throws IOException {
281    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
282    cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1"))
283      .setValue(value).setType(Cell.Type.Put);
284
285    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
286    // TODO We need a way to do this without creating files
287    File hFileLocation = testFolder.newFile();
288    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
289    try {
290      hFileFactory.withOutputStream(out);
291      hFileFactory.withFileContext(new HFileContextBuilder().build());
292      HFile.Writer writer = hFileFactory.create();
293      try {
294        writer.append(new KeyValue(cellBuilder.build()));
295      } finally {
296        writer.close();
297      }
298    } finally {
299      out.close();
300    }
301    return hFileLocation.getAbsoluteFile().getAbsolutePath();
302  }
303
304  public static class BulkReplicationTestObserver implements RegionCoprocessor {
305
306    String clusterName;
307    AtomicInteger bulkLoadCounts = new AtomicInteger();
308
309    @Override
310    public Optional<RegionObserver> getRegionObserver() {
311      return Optional.of(new RegionObserver() {
312
313        @Override
314        public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
315          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
316          throws IOException {
317          BULK_LOAD_LATCH.countDown();
318          BULK_LOADS_COUNT.incrementAndGet();
319          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
320            bulkLoadCounts.addAndGet(1));
321        }
322      });
323    }
324  }
325}