001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.nio.charset.Charset; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.backup.BackupAdmin; 031import org.apache.hadoop.hbase.backup.BackupInfo; 032import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 033import org.apache.hadoop.hbase.backup.BackupRequest; 034import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 035import org.apache.hadoop.hbase.backup.BackupType; 036import org.apache.hadoop.hbase.backup.RestoreRequest; 037import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 038import org.apache.hadoop.hbase.backup.impl.BackupManager; 039import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 040import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 041import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 042import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 043import org.apache.hadoop.hbase.chaos.policies.Policy; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.testclassification.IntegrationTests; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.util.ToolRunner; 054import org.junit.After; 055import org.junit.Assert; 056import org.junit.Before; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 063import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 065import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 066 067/** 068 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load 069 * data, perform series backup/load operations, then restore and verify data 070 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a> 071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a> 072 */ 073@Category(IntegrationTests.class) 074public class IntegrationTestBackupRestore extends IntegrationTestBase { 075 private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); 076 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); 077 protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; 078 protected static final String COLUMN_NAME = "f"; 079 protected static final String REGION_COUNT_KEY = "regions_per_rs"; 080 protected static final String REGIONSERVER_COUNT_KEY = "region_servers"; 081 protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration"; 082 protected static final String NUM_ITERATIONS_KEY = "num_iterations"; 083 protected static final int DEFAULT_REGION_COUNT = 10; 084 protected static final int DEFAULT_REGIONSERVER_COUNT = 5; 085 protected static final int DEFAULT_NUMBER_OF_TABLES = 1; 086 protected static final int DEFAULT_NUM_ITERATIONS = 10; 087 protected static final int DEFAULT_ROWS_IN_ITERATION = 500000; 088 protected static final String SLEEP_TIME_KEY = "sleeptime"; 089 // short default interval because tests don't run very long. 090 protected static final long SLEEP_TIME_DEFAULT = 50000L; 091 092 protected static int rowsInIteration; 093 protected static int regionsCountPerServer; 094 protected static int regionServerCount; 095 096 protected static int numIterations; 097 protected static int numTables; 098 protected static TableName[] tableNames; 099 protected long sleepTime; 100 protected static Object lock = new Object(); 101 102 private static String BACKUP_ROOT_DIR = "backupIT"; 103 104 @Override 105 @Before 106 public void setUp() throws Exception { 107 util = new IntegrationTestingUtility(); 108 Configuration conf = util.getConfiguration(); 109 regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); 110 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 111 rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); 112 numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); 113 numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); 114 sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); 115 enableBackup(conf); 116 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 117 util.initializeCluster(regionServerCount); 118 LOG.info("Cluster initialized and ready"); 119 } 120 121 @After 122 public void tearDown() throws IOException { 123 LOG.info("Cleaning up after test."); 124 if (util.isDistributedCluster()) { 125 deleteTablesIfAny(); 126 LOG.info("Cleaning up after test. Deleted tables"); 127 cleanUpBackupDir(); 128 } 129 LOG.info("Restoring cluster."); 130 util.restoreCluster(); 131 LOG.info("Cluster restored."); 132 } 133 134 @Override 135 public void setUpMonkey() throws Exception { 136 Policy p = 137 new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime)); 138 this.monkey = new PolicyBasedChaosMonkey(util, p); 139 startMonkey(); 140 } 141 142 private void deleteTablesIfAny() throws IOException { 143 for (TableName table : tableNames) { 144 util.deleteTableIfAny(table); 145 } 146 } 147 148 private void createTables() throws Exception { 149 tableNames = new TableName[numTables]; 150 for (int i = 0; i < numTables; i++) { 151 tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i); 152 } 153 for (TableName table : tableNames) { 154 createTable(table); 155 } 156 } 157 158 private void enableBackup(Configuration conf) { 159 // Enable backup 160 conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); 161 BackupManager.decorateMasterConfiguration(conf); 162 BackupManager.decorateRegionServerConfiguration(conf); 163 } 164 165 private void cleanUpBackupDir() throws IOException { 166 FileSystem fs = FileSystem.get(util.getConfiguration()); 167 fs.delete(new Path(BACKUP_ROOT_DIR), true); 168 } 169 170 @Test 171 public void testBackupRestore() throws Exception { 172 BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR; 173 createTables(); 174 runTestMulti(); 175 } 176 177 private void runTestMulti() throws IOException { 178 LOG.info("IT backup & restore started"); 179 Thread[] workers = new Thread[numTables]; 180 for (int i = 0; i < numTables; i++) { 181 final TableName table = tableNames[i]; 182 Runnable r = new Runnable() { 183 @Override 184 public void run() { 185 try { 186 runTestSingle(table); 187 } catch (IOException e) { 188 LOG.error("Failed", e); 189 Assert.fail(e.getMessage()); 190 } 191 } 192 }; 193 workers[i] = new Thread(r); 194 workers[i].start(); 195 } 196 // Wait all workers to finish 197 for (Thread t : workers) { 198 Uninterruptibles.joinUninterruptibly(t); 199 } 200 LOG.info("IT backup & restore finished"); 201 } 202 203 private void createTable(TableName tableName) throws Exception { 204 long startTime, endTime; 205 206 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 207 208 TableDescriptor desc = builder.build(); 209 ColumnFamilyDescriptorBuilder cbuilder = 210 ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); 211 ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; 212 LOG.info("Creating table {} with {} splits.", tableName, 213 regionsCountPerServer * regionServerCount); 214 startTime = EnvironmentEdgeManager.currentTime(); 215 HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, 216 regionsCountPerServer); 217 util.waitTableAvailable(tableName); 218 endTime = EnvironmentEdgeManager.currentTime(); 219 LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); 220 } 221 222 private void loadData(TableName table, int numRows) throws IOException { 223 Connection conn = util.getConnection(); 224 // #0- insert some data to a table 225 Table t1 = conn.getTable(table); 226 util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows); 227 // flush table 228 conn.getAdmin().flush(TableName.valueOf(table.getName())); 229 } 230 231 private String backup(BackupRequest request, BackupAdmin client) throws IOException { 232 String backupId = client.backupTables(request); 233 return backupId; 234 } 235 236 private void restore(RestoreRequest request, BackupAdmin client) throws IOException { 237 client.restore(request); 238 } 239 240 private void merge(String[] backupIds, BackupAdmin client) throws IOException { 241 client.mergeBackups(backupIds); 242 } 243 244 private void runTestSingle(TableName table) throws IOException { 245 246 List<String> backupIds = new ArrayList<String>(); 247 248 try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin(); 249 BackupAdmin client = new BackupAdminImpl(conn);) { 250 251 // #0- insert some data to table 'table' 252 loadData(table, rowsInIteration); 253 254 // #1 - create full backup for table first 255 LOG.info("create full backup image for {}", table); 256 List<TableName> tables = Lists.newArrayList(table); 257 BackupRequest.Builder builder = new BackupRequest.Builder(); 258 BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) 259 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 260 261 String backupIdFull = backup(request, client); 262 assertTrue(checkSucceeded(backupIdFull)); 263 264 backupIds.add(backupIdFull); 265 // Now continue with incremental backups 266 int count = 1; 267 while (count++ < numIterations) { 268 269 // Load data 270 loadData(table, rowsInIteration); 271 // Do incremental backup 272 builder = new BackupRequest.Builder(); 273 request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) 274 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 275 String backupId = backup(request, client); 276 assertTrue(checkSucceeded(backupId)); 277 backupIds.add(backupId); 278 279 // Restore incremental backup for table, with overwrite for previous backup 280 String previousBackupId = backupIds.get(backupIds.size() - 2); 281 restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1)); 282 // Restore incremental backup for table, with overwrite for last backup 283 restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count); 284 } 285 // Now merge all incremental and restore 286 String[] incBackupIds = allIncremental(backupIds); 287 merge(incBackupIds, client); 288 // Restore last one 289 String backupId = incBackupIds[incBackupIds.length - 1]; 290 // restore incremental backup for table, with overwrite 291 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 292 restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, 293 true), client); 294 Table hTable = conn.getTable(table); 295 Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations); 296 hTable.close(); 297 LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1)); 298 } 299 } 300 301 private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table, 302 String backupId, long expectedRows) throws IOException { 303 304 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 305 restore( 306 createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true), 307 client); 308 Table hTable = conn.getTable(table); 309 Assert.assertEquals(expectedRows, util.countRows(hTable)); 310 hTable.close(); 311 } 312 313 private String[] allIncremental(List<String> backupIds) { 314 int size = backupIds.size(); 315 backupIds = backupIds.subList(1, size); 316 String[] arr = new String[size - 1]; 317 backupIds.toArray(arr); 318 return arr; 319 } 320 321 /** 322 * Check if backup is succeeded 323 * @param backupId pass backup ID to check status of 324 * @return status of backup 325 */ 326 protected boolean checkSucceeded(String backupId) throws IOException { 327 BackupInfo status = getBackupInfo(backupId); 328 if (status == null) { 329 return false; 330 } 331 return status.getState() == BackupState.COMPLETE; 332 } 333 334 private BackupInfo getBackupInfo(String backupId) throws IOException { 335 try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) { 336 return table.readBackupInfo(backupId); 337 } 338 } 339 340 /** 341 * Get restore request. 342 * @param backupRootDir directory where backup is located 343 * @param backupId backup ID 344 * @param check check the backup 345 * @param fromTables table names to restore from 346 * @param toTables new table names to restore to 347 * @param isOverwrite overwrite the table(s) 348 * @return an instance of RestoreRequest 349 */ 350 public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, 351 TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { 352 RestoreRequest.Builder builder = new RestoreRequest.Builder(); 353 return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) 354 .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); 355 } 356 357 @Override 358 public void setUpCluster() throws Exception { 359 util = getTestingUtil(getConf()); 360 enableBackup(getConf()); 361 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 362 util.initializeCluster(regionServerCount); 363 LOG.debug("Done initializing/checking cluster"); 364 } 365 366 /** Returns status of CLI execution */ 367 @Override 368 public int runTestFromCommandLine() throws Exception { 369 // Check if backup is enabled 370 if (!BackupManager.isBackupEnabled(getConf())) { 371 System.err.println(BackupRestoreConstants.ENABLE_BACKUP); 372 return -1; 373 } 374 System.out.println(BackupRestoreConstants.VERIFY_BACKUP); 375 testBackupRestore(); 376 return 0; 377 } 378 379 @Override 380 public TableName getTablename() { 381 // That is only valid when Monkey is CALM (no monkey) 382 return null; 383 } 384 385 @Override 386 protected Set<String> getColumnFamilies() { 387 // That is only valid when Monkey is CALM (no monkey) 388 return null; 389 } 390 391 @Override 392 protected void addOptions() { 393 addOptWithArg(REGIONSERVER_COUNT_KEY, 394 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 395 addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); 396 addOptWithArg(ROWS_PER_ITERATION_KEY, 397 "Total number of data rows to be loaded during one iteration." + " Default: " 398 + DEFAULT_ROWS_IN_ITERATION); 399 addOptWithArg(NUM_ITERATIONS_KEY, 400 "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS); 401 addOptWithArg(NUMBER_OF_TABLES_KEY, 402 "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES); 403 addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms " 404 + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT); 405 } 406 407 @Override 408 protected void processOptions(CommandLine cmd) { 409 super.processOptions(cmd); 410 regionsCountPerServer = Integer 411 .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT))); 412 regionServerCount = Integer.parseInt( 413 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 414 rowsInIteration = Integer.parseInt( 415 cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION))); 416 numIterations = Integer 417 .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS))); 418 numTables = Integer.parseInt( 419 cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES))); 420 sleepTime = 421 Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT))); 422 423 LOG.info(MoreObjects.toStringHelper("Parsed Options") 424 .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) 425 .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) 426 .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); 427 } 428 429 /** 430 * Main method 431 * @param args argument list 432 */ 433 public static void main(String[] args) throws Exception { 434 Configuration conf = HBaseConfiguration.create(); 435 IntegrationTestingUtility.setUseDistributedCluster(conf); 436 int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); 437 System.exit(status); 438 } 439}