001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
023import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
024import static org.junit.Assert.assertEquals;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.quotas.QuotaUtil;
042import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.testclassification.ReplicationTests;
045import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.HFileTestUtil;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@Category({ ReplicationTests.class, LargeTests.class })
055public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
060
061  private static final Logger LOG =
062    LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
063
064  @Override
065  protected void customizeClusterConf(Configuration conf) {
066    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
067    conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
068    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
069    conf.set("hbase.replication.source.fs.conf.provider",
070      TestSourceFSConfigurationProvider.class.getCanonicalName());
071  }
072
073  @Test
074  public void testSyncUpTool() throws Exception {
075    // Set up Replication:
076    // on Master and one Slave Table: t1_syncup and t2_syncup
077    // columnfamily:
078    // 'cf1' : replicated
079    // 'norep': not replicated
080    setupReplication();
081
082    // Prepare 24 random hfile ranges required for creating hfiles
083    Iterator<String> randomHFileRangeListIterator = null;
084    Set<String> randomHFileRanges = new HashSet<>(24);
085    for (int i = 0; i < 24; i++) {
086      randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString());
087    }
088    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
089    Collections.sort(randomHFileRangeList);
090    randomHFileRangeListIterator = randomHFileRangeList.iterator();
091
092    // at Master:
093    // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep
094    // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into
095    // norep
096    // verify correctly replicated to slave
097    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
098
099    // Verify hfile load works
100    //
101    // step 1: stop hbase on Slave
102    //
103    // step 2: at Master:
104    // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep
105    // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep
106    //
107    // step 3: stop hbase on master, restart hbase on Slave
108    //
109    // step 4: verify Slave still has the rows before load
110    // t1_syncup: 100 rows from cf1
111    // t2_syncup: 200 rows from cf1
112    //
113    // step 5: run syncup tool on Master
114    //
115    // step 6: verify that hfiles show up on Slave and 'norep' does not
116    // t1_syncup: 200 rows from cf1
117    // t2_syncup: 400 rows from cf1
118    // verify correctly replicated to Slave
119    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
120
121  }
122
123  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
124    throws Exception {
125    LOG.debug("mimicSyncUpAfterBulkLoad");
126    shutDownTargetHBaseCluster();
127
128    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
129
130    int rowCount_ht1Source = countRows(ht1Source);
131    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
132      rowCount_ht1Source);
133
134    int rowCount_ht2Source = countRows(ht2Source);
135    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
136      rowCount_ht2Source);
137
138    shutDownSourceHBaseCluster();
139    restartTargetHBaseCluster(1);
140
141    Thread.sleep(SLEEP_TIME);
142
143    // Before sync up
144    int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
145    int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
146    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
147    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
148
149    // Run sync up tool
150    syncUp(UTIL1);
151
152    // After syun up
153    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
154    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
155    assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
156      rowCountHt1TargetAtPeer1);
157    assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
158      rowCountHt2TargetAtPeer1);
159  }
160
161  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
162    Iterator<String> randomHFileRangeListIterator) throws Exception {
163    LOG.debug("loadAndReplicateHFiles");
164
165    // Load 50 + 50 + 3 hfiles to t1_syncup.
166    byte[][][] hfileRanges =
167      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
168        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
169    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
170
171    hfileRanges =
172      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
173        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
174    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
175      hfileRanges, 50);
176
177    hfileRanges =
178      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
179        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
180    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
181      hfileRanges, 3);
182
183    // Load 100 + 100 + 3 hfiles to t2_syncup.
184    hfileRanges =
185      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
188
189    hfileRanges =
190      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
191        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
192    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
193      hfileRanges, 100);
194
195    hfileRanges =
196      new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
197        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
198    loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
199      hfileRanges, 3);
200
201    if (verifyReplicationOnSlave) {
202      // ensure replication completed
203      wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
204        "t1_syncup has 103 rows on source, and 100 on slave1");
205
206      wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
207        "t2_syncup has 203 rows on source, and 200 on slave1");
208    }
209  }
210
211  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
212    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
213    Path dir = UTIL1.getDataTestDirOnTestFS(testName);
214    FileSystem fs = UTIL1.getTestFileSystem();
215    dir = dir.makeQualified(fs);
216    Path familyDir = new Path(dir, Bytes.toString(fam));
217
218    int hfileIdx = 0;
219    for (byte[][] range : hfileRanges) {
220      byte[] from = range[0];
221      byte[] to = range[1];
222      HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
223        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
224    }
225
226    final TableName tableName = source.getName();
227    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
228    loader.bulkLoad(tableName, dir);
229  }
230
231  private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
232    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
233    Path dir = UTIL2.getDataTestDirOnTestFS(testName);
234    FileSystem fs = UTIL2.getTestFileSystem();
235    dir = dir.makeQualified(fs);
236    Path familyDir = new Path(dir, Bytes.toString(fam));
237
238    int hfileIdx = 0;
239    for (byte[][] range : hfileRanges) {
240      byte[] from = range[0];
241      byte[] to = range[1];
242      HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
243        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
244    }
245
246    final TableName tableName = source.getName();
247    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
248    loader.bulkLoad(tableName, dir);
249  }
250
251  private void wait(Table target, int expectedCount, String msg)
252    throws IOException, InterruptedException {
253    for (int i = 0; i < NB_RETRIES; i++) {
254      int rowCountHt2TargetAtPeer1 = countRows(target);
255      if (i == NB_RETRIES - 1) {
256        assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
257      }
258      if (expectedCount == rowCountHt2TargetAtPeer1) {
259        break;
260      }
261      Thread.sleep(SLEEP_TIME);
262    }
263  }
264}