001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES; 022import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; 023import static org.apache.hadoop.hbase.replication.TestReplicationBase.row; 024import static org.junit.Assert.assertEquals; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.quotas.QuotaUtil; 042import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.ReplicationTests; 045import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.HFileTestUtil; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054@Category({ ReplicationTests.class, LargeTests.class }) 055public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class); 060 061 private static final Logger LOG = 062 LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class); 063 064 @Override 065 protected void customizeClusterConf(Configuration conf) { 066 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 067 conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 068 conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 069 conf.set("hbase.replication.source.fs.conf.provider", 070 TestSourceFSConfigurationProvider.class.getCanonicalName()); 071 } 072 073 @Test 074 public void testSyncUpTool() throws Exception { 075 // Set up Replication: 076 // on Master and one Slave Table: t1_syncup and t2_syncup 077 // columnfamily: 078 // 'cf1' : replicated 079 // 'norep': not replicated 080 setupReplication(); 081 082 // Prepare 24 random hfile ranges required for creating hfiles 083 Iterator<String> randomHFileRangeListIterator = null; 084 Set<String> randomHFileRanges = new HashSet<>(24); 085 for (int i = 0; i < 24; i++) { 086 randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString()); 087 } 088 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); 089 Collections.sort(randomHFileRangeList); 090 randomHFileRangeListIterator = randomHFileRangeList.iterator(); 091 092 // at Master: 093 // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep 094 // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into 095 // norep 096 // verify correctly replicated to slave 097 loadAndReplicateHFiles(true, randomHFileRangeListIterator); 098 099 // Verify hfile load works 100 // 101 // step 1: stop hbase on Slave 102 // 103 // step 2: at Master: 104 // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep 105 // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep 106 // 107 // step 3: stop hbase on master, restart hbase on Slave 108 // 109 // step 4: verify Slave still has the rows before load 110 // t1_syncup: 100 rows from cf1 111 // t2_syncup: 200 rows from cf1 112 // 113 // step 5: run syncup tool on Master 114 // 115 // step 6: verify that hfiles show up on Slave and 'norep' does not 116 // t1_syncup: 200 rows from cf1 117 // t2_syncup: 400 rows from cf1 118 // verify correctly replicated to Slave 119 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); 120 121 } 122 123 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) 124 throws Exception { 125 LOG.debug("mimicSyncUpAfterBulkLoad"); 126 shutDownTargetHBaseCluster(); 127 128 loadAndReplicateHFiles(false, randomHFileRangeListIterator); 129 130 int rowCount_ht1Source = countRows(ht1Source); 131 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, 132 rowCount_ht1Source); 133 134 int rowCount_ht2Source = countRows(ht2Source); 135 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, 136 rowCount_ht2Source); 137 138 shutDownSourceHBaseCluster(); 139 restartTargetHBaseCluster(1); 140 141 Thread.sleep(SLEEP_TIME); 142 143 // Before sync up 144 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 145 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 146 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); 147 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); 148 149 // Run sync up tool 150 syncUp(UTIL1); 151 152 // After syun up 153 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); 154 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); 155 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, 156 rowCountHt1TargetAtPeer1); 157 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, 158 rowCountHt2TargetAtPeer1); 159 } 160 161 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, 162 Iterator<String> randomHFileRangeListIterator) throws Exception { 163 LOG.debug("loadAndReplicateHFiles"); 164 165 // Load 50 + 50 + 3 hfiles to t1_syncup. 166 byte[][][] hfileRanges = 167 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 168 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 169 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50); 170 171 hfileRanges = 172 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 173 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 174 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, 175 hfileRanges, 50); 176 177 hfileRanges = 178 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 179 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 180 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source, 181 hfileRanges, 3); 182 183 // Load 100 + 100 + 3 hfiles to t2_syncup. 184 hfileRanges = 185 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 186 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 187 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100); 188 189 hfileRanges = 190 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 191 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 192 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, 193 hfileRanges, 100); 194 195 hfileRanges = 196 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), 197 Bytes.toBytes(randomHFileRangeListIterator.next()) } }; 198 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source, 199 hfileRanges, 3); 200 201 if (verifyReplicationOnSlave) { 202 // ensure replication completed 203 wait(ht1TargetAtPeer1, countRows(ht1Source) - 3, 204 "t1_syncup has 103 rows on source, and 100 on slave1"); 205 206 wait(ht2TargetAtPeer1, countRows(ht2Source) - 3, 207 "t2_syncup has 203 rows on source, and 200 on slave1"); 208 } 209 } 210 211 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 212 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 213 Path dir = UTIL1.getDataTestDirOnTestFS(testName); 214 FileSystem fs = UTIL1.getTestFileSystem(); 215 dir = dir.makeQualified(fs); 216 Path familyDir = new Path(dir, Bytes.toString(fam)); 217 218 int hfileIdx = 0; 219 for (byte[][] range : hfileRanges) { 220 byte[] from = range[0]; 221 byte[] to = range[1]; 222 HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs, 223 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 224 } 225 226 final TableName tableName = source.getName(); 227 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); 228 loader.bulkLoad(tableName, dir); 229 } 230 231 private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam, 232 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { 233 Path dir = UTIL2.getDataTestDirOnTestFS(testName); 234 FileSystem fs = UTIL2.getTestFileSystem(); 235 dir = dir.makeQualified(fs); 236 Path familyDir = new Path(dir, Bytes.toString(fam)); 237 238 int hfileIdx = 0; 239 for (byte[][] range : hfileRanges) { 240 byte[] from = range[0]; 241 byte[] to = range[1]; 242 HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs, 243 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 244 } 245 246 final TableName tableName = source.getName(); 247 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration()); 248 loader.bulkLoad(tableName, dir); 249 } 250 251 private void wait(Table target, int expectedCount, String msg) 252 throws IOException, InterruptedException { 253 for (int i = 0; i < NB_RETRIES; i++) { 254 int rowCountHt2TargetAtPeer1 = countRows(target); 255 if (i == NB_RETRIES - 1) { 256 assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1); 257 } 258 if (expectedCount == rowCountHt2TargetAtPeer1) { 259 break; 260 } 261 Thread.sleep(SLEEP_TIME); 262 } 263 } 264}