001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Objects;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
041import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
042import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
043import org.apache.hadoop.hbase.backup.impl.BackupManager;
044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
045import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
046import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
047import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
048import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
049import org.apache.hadoop.hbase.backup.util.BackupUtils;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.ConnectionFactory;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Put;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
060import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
061import org.apache.hadoop.hbase.regionserver.LogRoller;
062import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
063import org.apache.hadoop.hbase.security.UserProvider;
064import org.apache.hadoop.hbase.security.access.SecureTestUtil;
065import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.CommonFSUtils;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.junit.AfterClass;
072import org.junit.Before;
073import org.junit.BeforeClass;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * This class is only a base for other integration-level backup tests. Do not add tests here.
079 * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
080 * tests should have their own classes and extend this one
081 */
082public class TestBackupBase {
083  private static final Logger LOG = LoggerFactory.getLogger(TestBackupBase.class);
084
085  protected static HBaseTestingUtil TEST_UTIL;
086  protected static HBaseTestingUtil TEST_UTIL2;
087  protected static Configuration conf1;
088  protected static Configuration conf2;
089
090  protected static TableName table1 = TableName.valueOf("table1");
091  protected static TableDescriptor table1Desc;
092  protected static TableName table2 = TableName.valueOf("table2");
093  protected static TableName table3 = TableName.valueOf("table3");
094  protected static TableName table4 = TableName.valueOf("table4");
095
096  protected static TableName table1_restore = TableName.valueOf("default:table1");
097  protected static TableName table2_restore = TableName.valueOf("ns2:table2");
098  protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
099
100  protected static final int NB_ROWS_IN_BATCH = 99;
101  protected static final byte[] qualName = Bytes.toBytes("q1");
102  protected static final byte[] famName = Bytes.toBytes("f");
103
104  protected static String BACKUP_ROOT_DIR;
105  protected static String BACKUP_REMOTE_ROOT_DIR;
106  protected static String provider = "defaultProvider";
107  protected static boolean secure = false;
108
109  protected static boolean autoRestoreOnFailure;
110  protected static boolean useSecondCluster;
111
112  static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient {
113    public IncrementalTableBackupClientForTest() {
114    }
115
116    public IncrementalTableBackupClientForTest(Connection conn, String backupId,
117      BackupRequest request) throws IOException {
118      super(conn, backupId, request);
119    }
120
121    @Before
122    public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
123      // Every operation here may not be necessary for any given test,
124      // some often being no-ops. the goal is to help ensure atomicity
125      // of that tests that implement TestBackupBase
126      try (BackupAdmin backupAdmin = getBackupAdmin()) {
127        backupManager.finishBackupSession();
128        backupAdmin.listBackupSets().forEach(backupSet -> {
129          try {
130            backupAdmin.deleteBackupSet(backupSet.getName());
131          } catch (IOException ignored) {
132          }
133        });
134      } catch (Exception ignored) {
135      }
136      Arrays.stream(TEST_UTIL.getAdmin().listTableNames())
137        .filter(tableName -> !tableName.isSystemTable()).forEach(tableName -> {
138          try {
139            TEST_UTIL.truncateTable(tableName);
140          } catch (IOException ignored) {
141          }
142        });
143      TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
144        try {
145          LogRoller walRoller = rst.getRegionServer().getWalRoller();
146          walRoller.requestRollAll();
147          walRoller.waitUntilWalRollFinished();
148        } catch (Exception ignored) {
149        }
150      });
151    }
152
153    @Override
154    public void execute() throws IOException {
155      // case INCREMENTAL_COPY:
156      try {
157        // case PREPARE_INCREMENTAL:
158        failStageIf(Stage.stage_0);
159        beginBackup(backupManager, backupInfo);
160
161        failStageIf(Stage.stage_1);
162        backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
163        LOG.debug("For incremental backup, current table set is "
164          + backupManager.getIncrementalBackupTableSet());
165        newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
166        // copy out the table and region info files for each table
167        BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
168        // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
169        convertWALsToHFiles();
170        incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
171          backupInfo.getBackupRootDir());
172        failStageIf(Stage.stage_2);
173
174        // case INCR_BACKUP_COMPLETE:
175        // set overall backup status: complete. Here we make sure to complete the backup.
176        // After this checkpoint, even if entering cancel process, will let the backup finished
177        // Set the previousTimestampMap which is before this current log roll to the manifest.
178        Map<TableName, Map<String, Long>> previousTimestampMap =
179          backupManager.readLogTimestampMap();
180        backupInfo.setIncrTimestampMap(previousTimestampMap);
181
182        // The table list in backupInfo is good for both full backup and incremental backup.
183        // For incremental backup, it contains the incremental backup table set.
184        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
185        failStageIf(Stage.stage_3);
186
187        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
188          backupManager.readLogTimestampMap();
189
190        Long newStartCode =
191          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
192        backupManager.writeBackupStartCode(newStartCode);
193
194        handleBulkLoad(backupInfo.getTableNames());
195        failStageIf(Stage.stage_4);
196
197        // backup complete
198        completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
199
200      } catch (Exception e) {
201        failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
202          BackupType.INCREMENTAL, conf);
203        throw new IOException(e);
204      }
205    }
206  }
207
208  static class FullTableBackupClientForTest extends FullTableBackupClient {
209    public FullTableBackupClientForTest() {
210    }
211
212    public FullTableBackupClientForTest(Connection conn, String backupId, BackupRequest request)
213      throws IOException {
214      super(conn, backupId, request);
215    }
216
217    @Override
218    public void execute() throws IOException {
219      // Get the stage ID to fail on
220      try (Admin admin = conn.getAdmin()) {
221        // Begin BACKUP
222        beginBackup(backupManager, backupInfo);
223        failStageIf(Stage.stage_0);
224        String savedStartCode;
225        boolean firstBackup;
226        // do snapshot for full table backup
227        savedStartCode = backupManager.readBackupStartCode();
228        firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
229        if (firstBackup) {
230          // This is our first backup. Let's put some marker to system table so that we can hold the
231          // logs while we do the backup.
232          backupManager.writeBackupStartCode(0L);
233        }
234        failStageIf(Stage.stage_1);
235        // We roll log here before we do the snapshot. It is possible there is duplicate data
236        // in the log that is already in the snapshot. But if we do it after the snapshot, we
237        // could have data loss.
238        // A better approach is to do the roll log on each RS in the same global procedure as
239        // the snapshot.
240        LOG.info("Execute roll log procedure for full backup ...");
241
242        Map<String, String> props = new HashMap<>();
243        props.put("backupRoot", backupInfo.getBackupRootDir());
244        admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
245          LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
246        failStageIf(Stage.stage_2);
247        newTimestamps = backupManager.readRegionServerLastLogRollResult();
248
249        // SNAPSHOT_TABLES:
250        backupInfo.setPhase(BackupPhase.SNAPSHOT);
251        for (TableName tableName : tableList) {
252          String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
253            + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
254
255          snapshotTable(admin, tableName, snapshotName);
256          backupInfo.setSnapshotName(tableName, snapshotName);
257        }
258        failStageIf(Stage.stage_3);
259        // SNAPSHOT_COPY:
260        // do snapshot copy
261        LOG.debug("snapshot copy for " + backupId);
262        snapshotCopy(backupInfo);
263        // Updates incremental backup table set
264        backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
265
266        // BACKUP_COMPLETE:
267        // set overall backup status: complete. Here we make sure to complete the backup.
268        // After this checkpoint, even if entering cancel process, will let the backup finished
269        backupInfo.setState(BackupState.COMPLETE);
270        // The table list in backupInfo is good for both full backup and incremental backup.
271        // For incremental backup, it contains the incremental backup table set.
272        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
273
274        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
275          backupManager.readLogTimestampMap();
276
277        Long newStartCode =
278          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
279        backupManager.writeBackupStartCode(newStartCode);
280        failStageIf(Stage.stage_4);
281        // backup complete
282        completeBackup(conn, backupInfo, BackupType.FULL, conf);
283
284      } catch (Exception e) {
285
286        if (autoRestoreOnFailure) {
287          failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
288            BackupType.FULL, conf);
289        }
290        throw new IOException(e);
291      }
292    }
293  }
294
295  public static void setUpHelper() throws Exception {
296    BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
297    BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
298
299    if (secure) {
300      // set the always on security provider
301      UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
302        HadoopSecurityEnabledUserProviderForTesting.class);
303      // setup configuration
304      SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
305    }
306    conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
307    BackupManager.decorateMasterConfiguration(conf1);
308    BackupManager.decorateRegionServerConfiguration(conf1);
309    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
310    // Set TTL for old WALs to 1 sec to enforce fast cleaning of an archived
311    // WAL files
312    conf1.setLong(TimeToLiveLogCleaner.TTL_CONF_KEY, 1000);
313    conf1.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 1000);
314
315    // Set MultiWAL (with 2 default WAL files per RS)
316    conf1.set(WALFactory.WAL_PROVIDER, provider);
317    TEST_UTIL.startMiniCluster();
318
319    if (useSecondCluster) {
320      conf2 = HBaseConfiguration.create(conf1);
321      conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
322      TEST_UTIL2 = new HBaseTestingUtil(conf2);
323      TEST_UTIL2.setZkCluster(TEST_UTIL.getZkCluster());
324      TEST_UTIL2.startMiniDFSCluster(3);
325      String root2 = TEST_UTIL2.getConfiguration().get("fs.defaultFS");
326      Path p = new Path(new Path(root2), "/tmp/wal");
327      CommonFSUtils.setWALRootDir(TEST_UTIL2.getConfiguration(), p);
328      TEST_UTIL2.startMiniCluster();
329    }
330    conf1 = TEST_UTIL.getConfiguration();
331
332    TEST_UTIL.startMiniMapReduceCluster();
333    BACKUP_ROOT_DIR =
334      new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), BACKUP_ROOT_DIR)
335        .toString();
336    LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
337    if (useSecondCluster) {
338      BACKUP_REMOTE_ROOT_DIR = new Path(
339        new Path(TEST_UTIL2.getConfiguration().get("fs.defaultFS")) + BACKUP_REMOTE_ROOT_DIR)
340          .toString();
341      LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
342    }
343    createTables();
344    populateFromMasterConfig(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), conf1);
345  }
346
347  /**
348   * Setup Cluster with appropriate configurations before running tests.
349   * @throws Exception if starting the mini cluster or setting up the tables fails
350   */
351  @BeforeClass
352  public static void setUp() throws Exception {
353    TEST_UTIL = new HBaseTestingUtil();
354    conf1 = TEST_UTIL.getConfiguration();
355    autoRestoreOnFailure = true;
356    useSecondCluster = false;
357    setUpHelper();
358  }
359
360  private static void populateFromMasterConfig(Configuration masterConf, Configuration conf) {
361    Iterator<Entry<String, String>> it = masterConf.iterator();
362    while (it.hasNext()) {
363      Entry<String, String> e = it.next();
364      conf.set(e.getKey(), e.getValue());
365    }
366  }
367
368  @AfterClass
369  public static void tearDown() throws Exception {
370    try {
371      SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getAdmin());
372    } catch (Exception e) {
373    }
374    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
375    if (useSecondCluster) {
376      TEST_UTIL2.shutdownMiniCluster();
377    }
378    TEST_UTIL.shutdownMiniCluster();
379    TEST_UTIL.shutdownMiniMapReduceCluster();
380    autoRestoreOnFailure = true;
381    useSecondCluster = false;
382  }
383
384  Table insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
385    throws IOException {
386    Table t = conn.getTable(table);
387    Put p1;
388    for (int i = 0; i < numRows; i++) {
389      p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-" + i));
390      p1.addColumn(family, qualName, Bytes.toBytes("val" + i));
391      t.put(p1);
392    }
393    return t;
394  }
395
396  protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables,
397    String path) {
398    return createBackupRequest(type, tables, path, false);
399  }
400
401  protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables, String path,
402    boolean noChecksumVerify) {
403    BackupRequest.Builder builder = new BackupRequest.Builder();
404    BackupRequest request = builder.withBackupType(type).withTableList(tables)
405      .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify).build();
406    return request;
407  }
408
409  protected String backupTables(BackupType type, List<TableName> tables, String path)
410    throws IOException {
411    Connection conn = null;
412    BackupAdmin badmin = null;
413    String backupId;
414    try {
415      conn = ConnectionFactory.createConnection(conf1);
416      badmin = new BackupAdminImpl(conn);
417      BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
418      backupId = badmin.backupTables(request);
419    } finally {
420      if (badmin != null) {
421        badmin.close();
422      }
423      if (conn != null) {
424        conn.close();
425      }
426    }
427    return backupId;
428  }
429
430  protected String fullTableBackup(List<TableName> tables) throws IOException {
431    return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
432  }
433
434  protected String incrementalTableBackup(List<TableName> tables) throws IOException {
435    return backupTables(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
436  }
437
438  protected static void loadTable(Table table) throws Exception {
439    Put p; // 100 + 1 row to t1_syncup
440    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
441      p = new Put(Bytes.toBytes("row" + i));
442      p.setDurability(Durability.SKIP_WAL);
443      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
444      table.put(p);
445    }
446  }
447
448  protected static void createTables() throws Exception {
449    long tid = EnvironmentEdgeManager.currentTime();
450    table1 = TableName.valueOf("test-" + tid);
451    Admin ha = TEST_UTIL.getAdmin();
452
453    // Create namespaces
454    ha.createNamespace(NamespaceDescriptor.create("ns1").build());
455    ha.createNamespace(NamespaceDescriptor.create("ns2").build());
456    ha.createNamespace(NamespaceDescriptor.create("ns3").build());
457    ha.createNamespace(NamespaceDescriptor.create("ns4").build());
458
459    TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
460      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
461    ha.createTable(desc);
462    table1Desc = desc;
463    Connection conn = ConnectionFactory.createConnection(conf1);
464    Table table = conn.getTable(table1);
465    loadTable(table);
466    table.close();
467    table2 = TableName.valueOf("ns2:test-" + tid + 1);
468    desc = TableDescriptorBuilder.newBuilder(table2)
469      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
470    ha.createTable(desc);
471    table = conn.getTable(table2);
472    loadTable(table);
473    table.close();
474    table3 = TableName.valueOf("ns3:test-" + tid + 2);
475    table = TEST_UTIL.createTable(table3, famName);
476    table.close();
477    table4 = TableName.valueOf("ns4:test-" + tid + 3);
478    table = TEST_UTIL.createTable(table4, famName);
479    table.close();
480    ha.close();
481    conn.close();
482  }
483
484  protected boolean checkSucceeded(String backupId) throws IOException {
485    BackupInfo status = getBackupInfo(backupId);
486
487    if (status == null) {
488      return false;
489    }
490
491    return status.getState() == BackupState.COMPLETE;
492  }
493
494  protected boolean checkFailed(String backupId) throws IOException {
495    BackupInfo status = getBackupInfo(backupId);
496
497    if (status == null) {
498      return false;
499    }
500
501    return status.getState() == BackupState.FAILED;
502  }
503
504  private BackupInfo getBackupInfo(String backupId) throws IOException {
505    try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
506      BackupInfo status = table.readBackupInfo(backupId);
507      return status;
508    }
509  }
510
511  protected static BackupAdmin getBackupAdmin() throws IOException {
512    return new BackupAdminImpl(TEST_UTIL.getConnection());
513  }
514
515  /**
516   * Helper method
517   */
518  protected List<TableName> toList(String... args) {
519    List<TableName> ret = new ArrayList<>();
520    for (int i = 0; i < args.length; i++) {
521      ret.add(TableName.valueOf(args[i]));
522    }
523    return ret;
524  }
525
526  protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
527    Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
528    FileSystem fs = logRoot.getFileSystem(c);
529    RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
530    List<FileStatus> logFiles = new ArrayList<FileStatus>();
531    while (it.hasNext()) {
532      LocatedFileStatus lfs = it.next();
533      if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
534        logFiles.add(lfs);
535        LOG.info(Objects.toString(lfs));
536      }
537    }
538    return logFiles;
539  }
540
541  protected void dumpBackupDir() throws IOException {
542    // Dump Backup Dir
543    FileSystem fs = FileSystem.get(conf1);
544    RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(BACKUP_ROOT_DIR), true);
545    while (it.hasNext()) {
546      LOG.debug(Objects.toString(it.next().getPath()));
547    }
548  }
549}