001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.Charset; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Set; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.backup.BackupAdmin; 032import org.apache.hadoop.hbase.backup.BackupInfo; 033import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 034import org.apache.hadoop.hbase.backup.BackupRequest; 035import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 036import org.apache.hadoop.hbase.backup.BackupType; 037import org.apache.hadoop.hbase.backup.RestoreRequest; 038import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 039import org.apache.hadoop.hbase.backup.impl.BackupManager; 040import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 041import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 042import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 043import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 044import org.apache.hadoop.hbase.chaos.policies.Policy; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.testclassification.IntegrationTests; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.util.ToolRunner; 055import org.junit.After; 056import org.junit.Assert; 057import org.junit.Before; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 064import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 067 068/** 069 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load 070 * data, perform series backup/load operations, then restore and verify data 071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a> 072 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a> 073 */ 074@Category(IntegrationTests.class) 075public class IntegrationTestBackupRestore extends IntegrationTestBase { 076 private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); 077 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); 078 protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; 079 protected static final String COLUMN_NAME = "f"; 080 protected static final String REGION_COUNT_KEY = "regions_per_rs"; 081 protected static final String REGIONSERVER_COUNT_KEY = "region_servers"; 082 protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration"; 083 protected static final String NUM_ITERATIONS_KEY = "num_iterations"; 084 protected static final int DEFAULT_REGION_COUNT = 10; 085 protected static final int DEFAULT_REGIONSERVER_COUNT = 5; 086 protected static final int DEFAULT_NUMBER_OF_TABLES = 1; 087 protected static final int DEFAULT_NUM_ITERATIONS = 10; 088 protected static final int DEFAULT_ROWS_IN_ITERATION = 500000; 089 protected static final String SLEEP_TIME_KEY = "sleeptime"; 090 // short default interval because tests don't run very long. 091 protected static final long SLEEP_TIME_DEFAULT = 50000L; 092 093 protected static int rowsInIteration; 094 protected static int regionsCountPerServer; 095 protected static int regionServerCount; 096 097 protected static int numIterations; 098 protected static int numTables; 099 protected static TableName[] tableNames; 100 protected long sleepTime; 101 protected static Object lock = new Object(); 102 103 private static String BACKUP_ROOT_DIR = "backupIT"; 104 105 @Override 106 @Before 107 public void setUp() throws Exception { 108 util = new IntegrationTestingUtility(); 109 Configuration conf = util.getConfiguration(); 110 regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); 111 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 112 rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); 113 numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); 114 numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); 115 sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); 116 enableBackup(conf); 117 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 118 util.initializeCluster(regionServerCount); 119 LOG.info("Cluster initialized and ready"); 120 } 121 122 @After 123 public void tearDown() throws IOException { 124 LOG.info("Cleaning up after test."); 125 if (util.isDistributedCluster()) { 126 deleteTablesIfAny(); 127 LOG.info("Cleaning up after test. Deleted tables"); 128 cleanUpBackupDir(); 129 } 130 LOG.info("Restoring cluster."); 131 util.restoreCluster(); 132 LOG.info("Cluster restored."); 133 } 134 135 @Override 136 public void setUpMonkey() throws Exception { 137 Policy p = 138 new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime)); 139 this.monkey = new PolicyBasedChaosMonkey(util, p); 140 startMonkey(); 141 } 142 143 private void deleteTablesIfAny() throws IOException { 144 for (TableName table : tableNames) { 145 util.deleteTableIfAny(table); 146 } 147 } 148 149 private void createTables() throws Exception { 150 tableNames = new TableName[numTables]; 151 for (int i = 0; i < numTables; i++) { 152 tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i); 153 } 154 for (TableName table : tableNames) { 155 createTable(table); 156 } 157 } 158 159 private void enableBackup(Configuration conf) { 160 // Enable backup 161 conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); 162 BackupManager.decorateMasterConfiguration(conf); 163 BackupManager.decorateRegionServerConfiguration(conf); 164 } 165 166 private void cleanUpBackupDir() throws IOException { 167 FileSystem fs = FileSystem.get(util.getConfiguration()); 168 fs.delete(new Path(BACKUP_ROOT_DIR), true); 169 } 170 171 @Test 172 public void testBackupRestore() throws Exception { 173 BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR; 174 createTables(); 175 runTestMulti(); 176 } 177 178 private void runTestMulti() throws IOException { 179 LOG.info("IT backup & restore started"); 180 Thread[] workers = new Thread[numTables]; 181 for (int i = 0; i < numTables; i++) { 182 final TableName table = tableNames[i]; 183 Runnable r = new Runnable() { 184 @Override 185 public void run() { 186 try { 187 runTestSingle(table); 188 } catch (IOException e) { 189 LOG.error("Failed", e); 190 Assert.fail(e.getMessage()); 191 } 192 } 193 }; 194 workers[i] = new Thread(r); 195 workers[i].start(); 196 } 197 // Wait all workers to finish 198 for (Thread t : workers) { 199 Uninterruptibles.joinUninterruptibly(t); 200 } 201 LOG.info("IT backup & restore finished"); 202 } 203 204 private void createTable(TableName tableName) throws Exception { 205 long startTime, endTime; 206 207 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 208 209 TableDescriptor desc = builder.build(); 210 ColumnFamilyDescriptorBuilder cbuilder = 211 ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); 212 ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; 213 LOG.info("Creating table {} with {} splits.", tableName, 214 regionsCountPerServer * regionServerCount); 215 startTime = EnvironmentEdgeManager.currentTime(); 216 createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer); 217 util.waitTableAvailable(tableName); 218 endTime = EnvironmentEdgeManager.currentTime(); 219 LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); 220 } 221 222 private void loadData(TableName table, int numRows) throws IOException { 223 Connection conn = util.getConnection(); 224 // #0- insert some data to a table 225 Table t1 = conn.getTable(table); 226 util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows); 227 // flush table 228 conn.getAdmin().flush(TableName.valueOf(table.getName())); 229 } 230 231 private String backup(BackupRequest request, BackupAdmin client) throws IOException { 232 String backupId = client.backupTables(request); 233 return backupId; 234 } 235 236 private void restore(RestoreRequest request, BackupAdmin client) throws IOException { 237 client.restore(request); 238 } 239 240 private void merge(String[] backupIds, BackupAdmin client) throws IOException { 241 client.mergeBackups(backupIds); 242 } 243 244 private void runTestSingle(TableName table) throws IOException { 245 246 List<String> backupIds = new ArrayList<String>(); 247 248 try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin(); 249 BackupAdmin client = new BackupAdminImpl(conn);) { 250 251 // #0- insert some data to table 'table' 252 loadData(table, rowsInIteration); 253 254 // #1 - create full backup for table first 255 LOG.info("create full backup image for {}", table); 256 List<TableName> tables = Lists.newArrayList(table); 257 BackupRequest.Builder builder = new BackupRequest.Builder(); 258 BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) 259 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 260 261 String backupIdFull = backup(request, client); 262 assertTrue(checkSucceeded(backupIdFull)); 263 264 backupIds.add(backupIdFull); 265 // Now continue with incremental backups 266 int count = 1; 267 while (count++ < numIterations) { 268 269 // Load data 270 loadData(table, rowsInIteration); 271 // Do incremental backup 272 builder = new BackupRequest.Builder(); 273 request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) 274 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 275 String backupId = backup(request, client); 276 assertTrue(checkSucceeded(backupId)); 277 backupIds.add(backupId); 278 279 // Restore incremental backup for table, with overwrite for previous backup 280 String previousBackupId = backupIds.get(backupIds.size() - 2); 281 restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1)); 282 // Restore incremental backup for table, with overwrite for last backup 283 restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count); 284 } 285 // Now merge all incremental and restore 286 String[] incBackupIds = allIncremental(backupIds); 287 merge(incBackupIds, client); 288 // Restore last one 289 String backupId = incBackupIds[incBackupIds.length - 1]; 290 // restore incremental backup for table, with overwrite 291 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 292 restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, 293 true), client); 294 Table hTable = conn.getTable(table); 295 Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations); 296 hTable.close(); 297 LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1)); 298 } 299 } 300 301 private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table, 302 String backupId, long expectedRows) throws IOException { 303 304 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 305 restore( 306 createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true), 307 client); 308 Table hTable = conn.getTable(table); 309 Assert.assertEquals(expectedRows, util.countRows(hTable)); 310 hTable.close(); 311 } 312 313 private String[] allIncremental(List<String> backupIds) { 314 int size = backupIds.size(); 315 backupIds = backupIds.subList(1, size); 316 String[] arr = new String[size - 1]; 317 backupIds.toArray(arr); 318 return arr; 319 } 320 321 /** Returns status of backup */ 322 protected boolean checkSucceeded(String backupId) throws IOException { 323 BackupInfo status = getBackupInfo(backupId); 324 if (status == null) { 325 return false; 326 } 327 return status.getState() == BackupState.COMPLETE; 328 } 329 330 private BackupInfo getBackupInfo(String backupId) throws IOException { 331 try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) { 332 return table.readBackupInfo(backupId); 333 } 334 } 335 336 /** 337 * Get restore request. 338 * @param backupRootDir directory where backup is located 339 * @param backupId backup ID 340 * @param check check the backup 341 * @param fromTables table names to restore from 342 * @param toTables new table names to restore to 343 * @param isOverwrite overwrite the table(s) 344 * @return an instance of RestoreRequest 345 */ 346 public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, 347 TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { 348 RestoreRequest.Builder builder = new RestoreRequest.Builder(); 349 return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) 350 .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); 351 } 352 353 @Override 354 public void setUpCluster() throws Exception { 355 util = getTestingUtil(getConf()); 356 enableBackup(getConf()); 357 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 358 util.initializeCluster(regionServerCount); 359 LOG.debug("Done initializing/checking cluster"); 360 } 361 362 /** Returns status of CLI execution */ 363 @Override 364 public int runTestFromCommandLine() throws Exception { 365 // Check if backup is enabled 366 if (!BackupManager.isBackupEnabled(getConf())) { 367 System.err.println(BackupRestoreConstants.ENABLE_BACKUP); 368 return -1; 369 } 370 System.out.println(BackupRestoreConstants.VERIFY_BACKUP); 371 testBackupRestore(); 372 return 0; 373 } 374 375 @Override 376 public TableName getTablename() { 377 // That is only valid when Monkey is CALM (no monkey) 378 return null; 379 } 380 381 @Override 382 protected Set<String> getColumnFamilies() { 383 // That is only valid when Monkey is CALM (no monkey) 384 return null; 385 } 386 387 @Override 388 protected void addOptions() { 389 addOptWithArg(REGIONSERVER_COUNT_KEY, 390 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 391 addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); 392 addOptWithArg(ROWS_PER_ITERATION_KEY, 393 "Total number of data rows to be loaded during one iteration." + " Default: " 394 + DEFAULT_ROWS_IN_ITERATION); 395 addOptWithArg(NUM_ITERATIONS_KEY, 396 "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS); 397 addOptWithArg(NUMBER_OF_TABLES_KEY, 398 "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES); 399 addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms " 400 + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT); 401 } 402 403 @Override 404 protected void processOptions(CommandLine cmd) { 405 super.processOptions(cmd); 406 regionsCountPerServer = Integer 407 .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT))); 408 regionServerCount = Integer.parseInt( 409 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 410 rowsInIteration = Integer.parseInt( 411 cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION))); 412 numIterations = Integer 413 .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS))); 414 numTables = Integer.parseInt( 415 cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES))); 416 sleepTime = 417 Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT))); 418 419 LOG.info(MoreObjects.toStringHelper("Parsed Options") 420 .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) 421 .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) 422 .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); 423 } 424 425 public static void main(String[] args) throws Exception { 426 Configuration conf = HBaseConfiguration.create(); 427 IntegrationTestingUtility.setUseDistributedCluster(conf); 428 int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); 429 System.exit(status); 430 } 431}