001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Set; 025import java.util.TreeSet; 026import java.util.UUID; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.conf.Configured; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.IntegrationTestingUtility; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.util.Tool; 042import org.apache.hadoop.util.ToolRunner; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 047import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 048import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 049 050/** 051 * This is an integration test for replication. It is derived off 052 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular 053 * linked list in one cluster and verifies that the data is correct in a sink cluster. The test 054 * handles creating the tables and schema and setting up the replication. 055 */ 056public class IntegrationTestReplication extends IntegrationTestBigLinkedList { 057 protected String sourceClusterIdString; 058 protected String sinkClusterIdString; 059 protected int numIterations; 060 protected int numMappers; 061 protected long numNodes; 062 protected String outputDir; 063 protected int numReducers; 064 protected int generateVerifyGap; 065 protected Integer width; 066 protected Integer wrapMultiplier; 067 protected boolean noReplicationSetup = false; 068 069 private final String SOURCE_CLUSTER_OPT = "sourceCluster"; 070 private final String DEST_CLUSTER_OPT = "destCluster"; 071 private final String ITERATIONS_OPT = "iterations"; 072 private final String NUM_MAPPERS_OPT = "numMappers"; 073 private final String OUTPUT_DIR_OPT = "outputDir"; 074 private final String NUM_REDUCERS_OPT = "numReducers"; 075 private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup"; 076 077 /** 078 * The gap (in seconds) from when data is finished being generated at the source to when it can be 079 * verified. This is the replication lag we are willing to tolerate 080 */ 081 private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap"; 082 083 /** 084 * The width of the linked list. See 085 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 086 */ 087 private final String WIDTH_OPT = "width"; 088 089 /** 090 * The number of rows after which the linked list points to the first row. See 091 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 092 */ 093 private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier"; 094 095 /** 096 * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH in 097 * order to ensure that the linked list can is complete. See 098 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 099 */ 100 private final String NUM_NODES_OPT = "numNodes"; 101 102 private final int DEFAULT_NUM_MAPPERS = 1; 103 private final int DEFAULT_NUM_REDUCERS = 1; 104 private final int DEFAULT_NUM_ITERATIONS = 1; 105 private final int DEFAULT_GENERATE_VERIFY_GAP = 60; 106 private final int DEFAULT_WIDTH = 1000000; 107 private final int DEFAULT_WRAP_MULTIPLIER = 25; 108 private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER; 109 110 /** 111 * Wrapper around an HBase ClusterID allowing us to get admin connections and configurations for 112 * it 113 */ 114 protected static class ClusterID { 115 private final Configuration configuration; 116 private Connection connection = null; 117 118 /** 119 * This creates a new ClusterID wrapper that will automatically build connections and 120 * configurations to be able to talk to the specified cluster 121 * @param base the base configuration that this class will add to 122 * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node 123 */ 124 public ClusterID(Configuration base, String key) { 125 configuration = new Configuration(base); 126 Iterator<String> iter = Splitter.on(':').split(key).iterator(); 127 configuration.set(HConstants.ZOOKEEPER_QUORUM, iter.next()); 128 configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, iter.next()); 129 configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, iter.next()); 130 } 131 132 @Override 133 public String toString() { 134 return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM), 135 configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT), 136 configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 137 } 138 139 public Configuration getConfiguration() { 140 return this.configuration; 141 } 142 143 public Connection getConnection() throws Exception { 144 if (this.connection == null) { 145 this.connection = ConnectionFactory.createConnection(this.configuration); 146 } 147 return this.connection; 148 } 149 150 public void closeConnection() throws Exception { 151 this.connection.close(); 152 this.connection = null; 153 } 154 155 @Override 156 public boolean equals(Object other) { 157 if (this == other) { 158 return true; 159 } 160 if (!(other instanceof ClusterID)) { 161 return false; 162 } 163 return toString().equalsIgnoreCase(other.toString()); 164 } 165 166 @Override 167 public int hashCode() { 168 return toString().hashCode(); 169 } 170 } 171 172 /** 173 * The main runner loop for the test. It uses 174 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for the generation and 175 * verification of the linked list. It is heavily based on 176 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop} 177 */ 178 protected class VerifyReplicationLoop extends Configured implements Tool { 179 private final Logger LOG = LoggerFactory.getLogger(VerifyReplicationLoop.class); 180 protected ClusterID source; 181 protected ClusterID sink; 182 183 IntegrationTestBigLinkedList integrationTestBigLinkedList; 184 185 /** 186 * This tears down any tables that existed from before and rebuilds the tables and schemas on 187 * the source cluster. It then sets up replication from the source to the sink cluster by using 188 * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} connection. 189 */ 190 protected void setupTablesAndReplication() throws Exception { 191 TableName tableName = getTableName(source.getConfiguration()); 192 193 ClusterID[] clusters = { source, sink }; 194 195 // delete any old tables in the source and sink 196 for (ClusterID cluster : clusters) { 197 Admin admin = cluster.getConnection().getAdmin(); 198 199 if (admin.tableExists(tableName)) { 200 if (admin.isTableEnabled(tableName)) { 201 admin.disableTable(tableName); 202 } 203 204 /** 205 * TODO: This is a work around on a replication bug (HBASE-13416) When we recreate a table 206 * against that has recently been deleted, the contents of the logs are replayed even 207 * though they should not. This ensures that we flush the logs before the table gets 208 * deleted. Eventually the bug should be fixed and this should be removed. 209 */ 210 Set<ServerName> regionServers = new TreeSet<>(); 211 for (HRegionLocation rl : cluster.getConnection().getRegionLocator(tableName) 212 .getAllRegionLocations()) { 213 regionServers.add(rl.getServerName()); 214 } 215 216 for (ServerName server : regionServers) { 217 source.getConnection().getAdmin().rollWALWriter(server); 218 } 219 220 admin.deleteTable(tableName); 221 } 222 } 223 224 // create the schema 225 Generator generator = new Generator(); 226 generator.setConf(source.getConfiguration()); 227 generator.createSchema(); 228 229 // setup the replication on the source 230 if (!source.equals(sink)) { 231 try (final Admin admin = source.getConnection().getAdmin()) { 232 // remove any old replication peers 233 for (ReplicationPeerDescription peer : admin.listReplicationPeers()) { 234 admin.removeReplicationPeer(peer.getPeerId()); 235 } 236 237 // set the test table to be the table to replicate 238 HashMap<TableName, List<String>> toReplicate = new HashMap<>(); 239 toReplicate.put(tableName, Collections.emptyList()); 240 241 // set the sink to be the target 242 final ReplicationPeerConfig peerConfig = 243 ReplicationPeerConfig.newBuilder().setClusterKey(sink.toString()) 244 .setReplicateAllUserTables(false).setTableCFsMap(toReplicate).build(); 245 246 admin.addReplicationPeer("TestPeer", peerConfig); 247 admin.enableTableReplication(tableName); 248 } 249 } 250 251 for (ClusterID cluster : clusters) { 252 cluster.closeConnection(); 253 } 254 } 255 256 protected void waitForReplication() throws Exception { 257 // TODO: we shouldn't be sleeping here. It would be better to query the region servers 258 // and wait for them to report 0 replication lag. 259 Thread.sleep(generateVerifyGap * 1000); 260 } 261 262 /** 263 * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the 264 * source cluster. This assumes that the tables have been setup via setupTablesAndReplication. 265 */ 266 protected void runGenerator() throws Exception { 267 Path outputPath = new Path(outputDir); 268 UUID uuid = util.getRandomUUID(); // create a random UUID. 269 Path generatorOutput = new Path(outputPath, uuid.toString()); 270 271 Generator generator = new Generator(); 272 generator.setConf(source.getConfiguration()); 273 274 // Disable concurrent walkers for IntegrationTestReplication 275 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0); 276 if (retCode > 0) { 277 throw new RuntimeException("Generator failed with return code: " + retCode); 278 } 279 } 280 281 /** 282 * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify} in the sink 283 * cluster. If replication is working properly the data written at the source cluster should be 284 * available in the sink cluster after a reasonable gap 285 * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster 286 */ 287 protected void runVerify(long expectedNumNodes) throws Exception { 288 Path outputPath = new Path(outputDir); 289 UUID uuid = util.getRandomUUID(); // create a random UUID. 290 Path iterationOutput = new Path(outputPath, uuid.toString()); 291 292 Verify verify = new Verify(); 293 verify.setConf(sink.getConfiguration()); 294 295 int retCode = verify.run(iterationOutput, numReducers); 296 if (retCode > 0) { 297 throw new RuntimeException("Verify.run failed with return code: " + retCode); 298 } 299 300 if (!verify.verify(expectedNumNodes)) { 301 throw new RuntimeException("Verify.verify failed"); 302 } 303 304 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 305 } 306 307 /** 308 * The main test runner This test has 4 steps: 1: setupTablesAndReplication 2: generate the data 309 * into the source cluster 3: wait for replication to propagate 4: verify that the data is 310 * available in the sink cluster 311 * @param args should be empty 312 * @return 0 on success 313 * @throws Exception on an error 314 */ 315 @Override 316 public int run(String[] args) throws Exception { 317 source = new ClusterID(getConf(), sourceClusterIdString); 318 sink = new ClusterID(getConf(), sinkClusterIdString); 319 320 if (!noReplicationSetup) { 321 setupTablesAndReplication(); 322 } 323 long expectedNumNodes = 0; 324 for (int i = 0; i < numIterations; i++) { 325 LOG.info("Starting iteration = " + i); 326 327 expectedNumNodes += numMappers * numNodes; 328 329 runGenerator(); 330 waitForReplication(); 331 runVerify(expectedNumNodes); 332 } 333 334 /** 335 * we are always returning 0 because exceptions are thrown when there is an error in the 336 * verification step. 337 */ 338 return 0; 339 } 340 } 341 342 @Override 343 protected void addOptions() { 344 super.addOptions(); 345 addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT, 346 "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)"); 347 addRequiredOptWithArg("r", DEST_CLUSTER_OPT, 348 "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)"); 349 addRequiredOptWithArg("d", OUTPUT_DIR_OPT, 350 "Temporary directory where to write keys for the test"); 351 352 addOptWithArg("nm", NUM_MAPPERS_OPT, 353 "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")"); 354 addOptWithArg("nr", NUM_REDUCERS_OPT, 355 "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")"); 356 addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT, 357 "Don't setup tables or configure replication before starting test"); 358 addOptWithArg("n", NUM_NODES_OPT, 359 "Number of nodes. This should be a multiple of width * wrapMultiplier." + " (default: " 360 + DEFAULT_NUM_NODES + ")"); 361 addOptWithArg("i", ITERATIONS_OPT, 362 "Number of iterations to run (default: " + DEFAULT_NUM_ITERATIONS + ")"); 363 addOptWithArg("t", GENERATE_VERIFY_GAP_OPT, 364 "Gap between generate and verify steps in seconds (default: " + DEFAULT_GENERATE_VERIFY_GAP 365 + ")"); 366 addOptWithArg("w", WIDTH_OPT, 367 "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")"); 368 addOptWithArg("wm", WRAP_MULTIPLIER_OPT, 369 "How many times to wrap around (default: " + DEFAULT_WRAP_MULTIPLIER + ")"); 370 } 371 372 @Override 373 protected void processOptions(CommandLine cmd) { 374 processBaseOptions(cmd); 375 376 sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT); 377 sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT); 378 outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT); 379 380 /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */ 381 numMappers = 382 parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT, Integer.toString(DEFAULT_NUM_MAPPERS)), 1, 383 Integer.MAX_VALUE); 384 numReducers = 385 parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT, Integer.toString(DEFAULT_NUM_REDUCERS)), 1, 386 Integer.MAX_VALUE); 387 numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), 1, 388 Integer.MAX_VALUE); 389 generateVerifyGap = parseInt( 390 cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), 1, 391 Integer.MAX_VALUE); 392 numIterations = 393 parseInt(cmd.getOptionValue(ITERATIONS_OPT, Integer.toString(DEFAULT_NUM_ITERATIONS)), 1, 394 Integer.MAX_VALUE); 395 width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)), 1, 396 Integer.MAX_VALUE); 397 wrapMultiplier = 398 parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT, Integer.toString(DEFAULT_WRAP_MULTIPLIER)), 399 1, Integer.MAX_VALUE); 400 401 if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) { 402 noReplicationSetup = true; 403 } 404 405 if (numNodes % (width * wrapMultiplier) != 0) { 406 throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier"); 407 } 408 } 409 410 @Override 411 public int runTestFromCommandLine() throws Exception { 412 VerifyReplicationLoop tool = new VerifyReplicationLoop(); 413 tool.integrationTestBigLinkedList = this; 414 return ToolRunner.run(getConf(), tool, null); 415 } 416 417 public static void main(String[] args) throws Exception { 418 Configuration conf = HBaseConfiguration.create(); 419 IntegrationTestingUtility.setUseDistributedCluster(conf); 420 int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args); 421 System.exit(ret); 422 } 423}