001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
034import org.apache.hadoop.hbase.backup.impl.BackupCommands;
035import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
036import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
037import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
038import org.apache.hadoop.hbase.backup.util.BackupUtils;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Pair;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053
054@Category(LargeTests.class)
055public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithFailures.class);
060
061  private static final Logger LOG =
062    LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class);
063
064  enum FailurePhase {
065    PHASE1,
066    PHASE2,
067    PHASE3,
068    PHASE4
069  }
070
071  public final static String FAILURE_PHASE_KEY = "failurePhase";
072
073  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
074    FailurePhase failurePhase;
075
076    @Override
077    public void setConf(Configuration conf) {
078      super.setConf(conf);
079      String val = conf.get(FAILURE_PHASE_KEY);
080      if (val != null) {
081        failurePhase = FailurePhase.valueOf(val);
082      } else {
083        Assert.fail("Failure phase is not set");
084      }
085    }
086
087    /**
088     * This is the exact copy of parent's run() with injections of different types of failures
089     */
090    @Override
091    public void run(String[] backupIds) throws IOException {
092      String bulkOutputConfKey;
093
094      // TODO : run player on remote cluster
095      player = new MapReduceHFileSplitterJob();
096      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
097      // Player reads all files in arbitrary directory structure and creates
098      // a Map task for each file
099      String bids = StringUtils.join(backupIds, ",");
100
101      if (LOG.isDebugEnabled()) {
102        LOG.debug("Merge backup images " + bids);
103      }
104
105      List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
106      boolean finishedTables = false;
107      Connection conn = ConnectionFactory.createConnection(getConf());
108      BackupSystemTable table = new BackupSystemTable(conn);
109      FileSystem fs = FileSystem.get(getConf());
110
111      try {
112        // Start backup exclusive operation
113        table.startBackupExclusiveOperation();
114        // Start merge operation
115        table.startMergeOperation(backupIds);
116
117        // Select most recent backup id
118        String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
119
120        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
121
122        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
123        String backupRoot = bInfo.getBackupRootDir();
124        // PHASE 1
125        checkFailure(FailurePhase.PHASE1);
126
127        for (int i = 0; i < tableNames.length; i++) {
128          LOG.info("Merge backup images for " + tableNames[i]);
129
130          // Find input directories for table
131          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
132          String dirs = StringUtils.join(dirPaths, ",");
133          Path bulkOutputPath = BackupUtils.getBulkOutputDir(
134            BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
135          // Delete content if exists
136          if (fs.exists(bulkOutputPath)) {
137            if (!fs.delete(bulkOutputPath, true)) {
138              LOG.warn("Can not delete: " + bulkOutputPath);
139            }
140          }
141          Configuration conf = getConf();
142          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
143          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
144
145          // PHASE 2
146          checkFailure(FailurePhase.PHASE2);
147          player.setConf(getConf());
148          int result = player.run(playerArgs);
149          if (succeeded(result)) {
150            // Add to processed table list
151            processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
152          } else {
153            throw new IOException("Can not merge backup images for " + dirs
154              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
155          }
156          LOG.debug("Merge Job finished:" + result);
157        }
158        List<TableName> tableList = toTableNameList(processedTableList);
159        // PHASE 3
160        checkFailure(FailurePhase.PHASE3);
161        table.updateProcessedTablesForMerge(tableList);
162        finishedTables = true;
163
164        // (modification of a backup file system)
165        // Move existing mergedBackupId data into tmp directory
166        // we will need it later in case of a failure
167        Path tmpBackupDir =
168          HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
169        Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
170        if (!fs.rename(backupDirPath, tmpBackupDir)) {
171          throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
172        } else {
173          LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
174        }
175        // Move new data into backup dest
176        for (Pair<TableName, Path> tn : processedTableList) {
177          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
178        }
179        checkFailure(FailurePhase.PHASE4);
180        // Update backup manifest
181        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
182        updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
183        // Copy meta files back from tmp to backup dir
184        copyMetaData(fs, tmpBackupDir, backupDirPath);
185        // Delete tmp dir (Rename back during repair)
186        if (!fs.delete(tmpBackupDir, true)) {
187          // WARN and ignore
188          LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
189        }
190        // Delete old data
191        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
192        // Finish merge session
193        table.finishMergeOperation();
194        // Release lock
195        table.finishBackupExclusiveOperation();
196      } catch (RuntimeException e) {
197        throw e;
198      } catch (Exception e) {
199        LOG.error(e.toString(), e);
200        if (!finishedTables) {
201          // cleanup bulk directories and finish merge
202          // merge MUST be repeated (no need for repair)
203          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
204          table.finishMergeOperation();
205          table.finishBackupExclusiveOperation();
206          throw new IOException("Backup merge operation failed, you should try it again", e);
207        } else {
208          // backup repair must be run
209          throw new IOException(
210            "Backup merge operation failed, run backup repair tool to restore system's integrity",
211            e);
212        }
213      } finally {
214        table.close();
215        conn.close();
216      }
217    }
218
219    private void checkFailure(FailurePhase phase) throws IOException {
220      if (failurePhase != null && failurePhase == phase) {
221        throw new IOException(phase.toString());
222      }
223    }
224  }
225
226  @Test
227  public void TestIncBackupMergeRestore() throws Exception {
228    int ADD_ROWS = 99;
229    // #1 - create full backup for all tables
230    LOG.info("create full backup image for all tables");
231
232    List<TableName> tables = Lists.newArrayList(table1, table2);
233    // Set custom Merge Job implementation
234    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
235      BackupMergeJobWithFailures.class, BackupMergeJob.class);
236
237    Connection conn = ConnectionFactory.createConnection(conf1);
238
239    Admin admin = conn.getAdmin();
240    BackupAdminImpl client = new BackupAdminImpl(conn);
241
242    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
243    String backupIdFull = client.backupTables(request);
244
245    assertTrue(checkSucceeded(backupIdFull));
246
247    // #2 - insert some data to table1
248    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
249    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
250
251    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
252    t1.close();
253    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
254
255    Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
256
257    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
258    t2.close();
259    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
260
261    // #3 - incremental backup for multiple tables
262    tables = Lists.newArrayList(table1, table2);
263    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
264    String backupIdIncMultiple = client.backupTables(request);
265
266    assertTrue(checkSucceeded(backupIdIncMultiple));
267
268    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
269    t1.close();
270
271    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
272    t2.close();
273
274    // #3 - incremental backup for multiple tables
275    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
276    String backupIdIncMultiple2 = client.backupTables(request);
277    assertTrue(checkSucceeded(backupIdIncMultiple2));
278    // #4 Merge backup images with failures
279
280    for (FailurePhase phase : FailurePhase.values()) {
281      Configuration conf = conn.getConfiguration();
282
283      conf.set(FAILURE_PHASE_KEY, phase.toString());
284
285      try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
286        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
287        bAdmin.mergeBackups(backups);
288        Assert.fail("Expected IOException");
289      } catch (IOException e) {
290        BackupSystemTable table = new BackupSystemTable(conn);
291        if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
292          // No need to repair:
293          // Both Merge and backup exclusive operations are finished
294          assertFalse(table.isMergeInProgress());
295          try {
296            table.finishBackupExclusiveOperation();
297            Assert.fail("IOException is expected");
298          } catch (IOException ee) {
299            // Expected
300          }
301        } else {
302          // Repair is required
303          assertTrue(table.isMergeInProgress());
304          try {
305            table.startBackupExclusiveOperation();
306            Assert.fail("IOException is expected");
307          } catch (IOException ee) {
308            // Expected - clean up before proceeding
309            // table.finishMergeOperation();
310            // table.finishBackupExclusiveOperation();
311          }
312        }
313        table.close();
314        LOG.debug("Expected :" + e.getMessage());
315      }
316    }
317    // Now merge w/o failures
318    Configuration conf = conn.getConfiguration();
319    conf.unset(FAILURE_PHASE_KEY);
320    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
321    // Now run repair
322    BackupSystemTable sysTable = new BackupSystemTable(conn);
323    BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
324    // Now repeat merge
325    try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
326      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
327      bAdmin.mergeBackups(backups);
328    }
329
330    // #6 - restore incremental backup for multiple tables, with overwrite
331    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
332    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
333    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
334      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
335
336    Table hTable = conn.getTable(table1_restore);
337    LOG.debug("After incremental restore: " + hTable.getDescriptor());
338    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
339    Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
340
341    hTable.close();
342
343    hTable = conn.getTable(table2_restore);
344    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
345    hTable.close();
346
347    admin.close();
348    conn.close();
349  }
350}