001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 034import org.apache.hadoop.hbase.backup.impl.BackupCommands; 035import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 036import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; 037import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; 038import org.apache.hadoop.hbase.backup.util.BackupUtils; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Pair; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053 054@Category(LargeTests.class) 055public class TestIncrementalBackupMergeWithFailures extends TestBackupBase { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithFailures.class); 060 061 private static final Logger LOG = 062 LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class); 063 064 enum FailurePhase { 065 PHASE1, 066 PHASE2, 067 PHASE3, 068 PHASE4 069 } 070 071 public final static String FAILURE_PHASE_KEY = "failurePhase"; 072 073 static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob { 074 FailurePhase failurePhase; 075 076 @Override 077 public void setConf(Configuration conf) { 078 super.setConf(conf); 079 String val = conf.get(FAILURE_PHASE_KEY); 080 if (val != null) { 081 failurePhase = FailurePhase.valueOf(val); 082 } else { 083 Assert.fail("Failure phase is not set"); 084 } 085 } 086 087 /** 088 * This is the exact copy of parent's run() with injections of different types of failures 089 */ 090 @Override 091 public void run(String[] backupIds) throws IOException { 092 String bulkOutputConfKey; 093 094 // TODO : run player on remote cluster 095 player = new MapReduceHFileSplitterJob(); 096 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 097 // Player reads all files in arbitrary directory structure and creates 098 // a Map task for each file 099 String bids = StringUtils.join(backupIds, ","); 100 101 if (LOG.isDebugEnabled()) { 102 LOG.debug("Merge backup images " + bids); 103 } 104 105 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 106 boolean finishedTables = false; 107 Connection conn = ConnectionFactory.createConnection(getConf()); 108 BackupSystemTable table = new BackupSystemTable(conn); 109 FileSystem fs = FileSystem.get(getConf()); 110 111 try { 112 // Start backup exclusive operation 113 table.startBackupExclusiveOperation(); 114 // Start merge operation 115 table.startMergeOperation(backupIds); 116 117 // Select most recent backup id 118 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 119 120 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 121 122 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 123 String backupRoot = bInfo.getBackupRootDir(); 124 // PHASE 1 125 checkFailure(FailurePhase.PHASE1); 126 127 for (int i = 0; i < tableNames.length; i++) { 128 LOG.info("Merge backup images for " + tableNames[i]); 129 130 // Find input directories for table 131 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 132 String dirs = StringUtils.join(dirPaths, ","); 133 Path bulkOutputPath = BackupUtils.getBulkOutputDir( 134 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 135 // Delete content if exists 136 if (fs.exists(bulkOutputPath)) { 137 if (!fs.delete(bulkOutputPath, true)) { 138 LOG.warn("Can not delete: " + bulkOutputPath); 139 } 140 } 141 Configuration conf = getConf(); 142 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 143 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 144 145 // PHASE 2 146 checkFailure(FailurePhase.PHASE2); 147 player.setConf(getConf()); 148 int result = player.run(playerArgs); 149 if (succeeded(result)) { 150 // Add to processed table list 151 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 152 } else { 153 throw new IOException("Can not merge backup images for " + dirs 154 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 155 } 156 LOG.debug("Merge Job finished:" + result); 157 } 158 List<TableName> tableList = toTableNameList(processedTableList); 159 // PHASE 3 160 checkFailure(FailurePhase.PHASE3); 161 table.updateProcessedTablesForMerge(tableList); 162 finishedTables = true; 163 164 // (modification of a backup file system) 165 // Move existing mergedBackupId data into tmp directory 166 // we will need it later in case of a failure 167 Path tmpBackupDir = 168 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 169 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 170 if (!fs.rename(backupDirPath, tmpBackupDir)) { 171 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 172 } else { 173 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 174 } 175 // Move new data into backup dest 176 for (Pair<TableName, Path> tn : processedTableList) { 177 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 178 } 179 checkFailure(FailurePhase.PHASE4); 180 // Update backup manifest 181 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 182 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 183 // Copy meta files back from tmp to backup dir 184 copyMetaData(fs, tmpBackupDir, backupDirPath); 185 // Delete tmp dir (Rename back during repair) 186 if (!fs.delete(tmpBackupDir, true)) { 187 // WARN and ignore 188 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 189 } 190 // Delete old data 191 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 192 // Finish merge session 193 table.finishMergeOperation(); 194 // Release lock 195 table.finishBackupExclusiveOperation(); 196 } catch (RuntimeException e) { 197 throw e; 198 } catch (Exception e) { 199 LOG.error(e.toString(), e); 200 if (!finishedTables) { 201 // cleanup bulk directories and finish merge 202 // merge MUST be repeated (no need for repair) 203 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 204 table.finishMergeOperation(); 205 table.finishBackupExclusiveOperation(); 206 throw new IOException("Backup merge operation failed, you should try it again", e); 207 } else { 208 // backup repair must be run 209 throw new IOException( 210 "Backup merge operation failed, run backup repair tool to restore system's integrity", 211 e); 212 } 213 } finally { 214 table.close(); 215 conn.close(); 216 } 217 } 218 219 private void checkFailure(FailurePhase phase) throws IOException { 220 if (failurePhase != null && failurePhase == phase) { 221 throw new IOException(phase.toString()); 222 } 223 } 224 } 225 226 @Test 227 public void TestIncBackupMergeRestore() throws Exception { 228 int ADD_ROWS = 99; 229 // #1 - create full backup for all tables 230 LOG.info("create full backup image for all tables"); 231 232 List<TableName> tables = Lists.newArrayList(table1, table2); 233 // Set custom Merge Job implementation 234 conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS, 235 BackupMergeJobWithFailures.class, BackupMergeJob.class); 236 237 Connection conn = ConnectionFactory.createConnection(conf1); 238 239 Admin admin = conn.getAdmin(); 240 BackupAdminImpl client = new BackupAdminImpl(conn); 241 242 BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); 243 String backupIdFull = client.backupTables(request); 244 245 assertTrue(checkSucceeded(backupIdFull)); 246 247 // #2 - insert some data to table1 248 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); 249 LOG.debug("writing " + ADD_ROWS + " rows to " + table1); 250 251 Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); 252 t1.close(); 253 LOG.debug("written " + ADD_ROWS + " rows to " + table1); 254 255 Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); 256 257 Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); 258 t2.close(); 259 LOG.debug("written " + ADD_ROWS + " rows to " + table2); 260 261 // #3 - incremental backup for multiple tables 262 tables = Lists.newArrayList(table1, table2); 263 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 264 String backupIdIncMultiple = client.backupTables(request); 265 266 assertTrue(checkSucceeded(backupIdIncMultiple)); 267 268 t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); 269 t1.close(); 270 271 t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); 272 t2.close(); 273 274 // #3 - incremental backup for multiple tables 275 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 276 String backupIdIncMultiple2 = client.backupTables(request); 277 assertTrue(checkSucceeded(backupIdIncMultiple2)); 278 // #4 Merge backup images with failures 279 280 for (FailurePhase phase : FailurePhase.values()) { 281 Configuration conf = conn.getConfiguration(); 282 283 conf.set(FAILURE_PHASE_KEY, phase.toString()); 284 285 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 286 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 287 bAdmin.mergeBackups(backups); 288 Assert.fail("Expected IOException"); 289 } catch (IOException e) { 290 BackupSystemTable table = new BackupSystemTable(conn); 291 if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) { 292 // No need to repair: 293 // Both Merge and backup exclusive operations are finished 294 assertFalse(table.isMergeInProgress()); 295 try { 296 table.finishBackupExclusiveOperation(); 297 Assert.fail("IOException is expected"); 298 } catch (IOException ee) { 299 // Expected 300 } 301 } else { 302 // Repair is required 303 assertTrue(table.isMergeInProgress()); 304 try { 305 table.startBackupExclusiveOperation(); 306 Assert.fail("IOException is expected"); 307 } catch (IOException ee) { 308 // Expected - clean up before proceeding 309 // table.finishMergeOperation(); 310 // table.finishBackupExclusiveOperation(); 311 } 312 } 313 table.close(); 314 LOG.debug("Expected :" + e.getMessage()); 315 } 316 } 317 // Now merge w/o failures 318 Configuration conf = conn.getConfiguration(); 319 conf.unset(FAILURE_PHASE_KEY); 320 conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS); 321 // Now run repair 322 BackupSystemTable sysTable = new BackupSystemTable(conn); 323 BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable); 324 // Now repeat merge 325 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 326 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 327 bAdmin.mergeBackups(backups); 328 } 329 330 // #6 - restore incremental backup for multiple tables, with overwrite 331 TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; 332 TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; 333 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, 334 tablesRestoreIncMultiple, tablesMapIncMultiple, true)); 335 336 Table hTable = conn.getTable(table1_restore); 337 LOG.debug("After incremental restore: " + hTable.getDescriptor()); 338 LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); 339 Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); 340 341 hTable.close(); 342 343 hTable = conn.getTable(table2_restore); 344 Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); 345 hTable.close(); 346 347 admin.close(); 348 conn.close(); 349 } 350}