001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
021import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
022import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
023import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
024import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.Map;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.backup.BackupCopyJob;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupRequest;
036import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
037import org.apache.hadoop.hbase.backup.BackupType;
038import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
039import org.apache.hadoop.hbase.backup.util.BackupUtils;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Full table backup implementation
049 */
050@InterfaceAudience.Private
051public class FullTableBackupClient extends TableBackupClient {
052  private static final Logger LOG = LoggerFactory.getLogger(FullTableBackupClient.class);
053
054  public FullTableBackupClient() {
055  }
056
057  public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
058    throws IOException {
059    super(conn, backupId, request);
060  }
061
062  /**
063   * Do snapshot copy.
064   * @param backupInfo backup info
065   * @throws Exception exception
066   */
067  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
068    LOG.info("Snapshot copy is starting.");
069
070    // set overall backup phase: snapshot_copy
071    backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
072
073    // call ExportSnapshot to copy files based on hbase snapshot for backup
074    // ExportSnapshot only support single snapshot export, need loop for multiple tables case
075    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
076
077    // number of snapshots matches number of tables
078    float numOfSnapshots = backupInfo.getSnapshotNames().size();
079
080    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
081
082    for (TableName table : backupInfo.getTables()) {
083      // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
084      // calculate the real files' size for the percentage in the future.
085      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
086      int res;
087      ArrayList<String> argsList = new ArrayList<>();
088      argsList.add("-snapshot");
089      argsList.add(backupInfo.getSnapshotName(table));
090      argsList.add("-copy-to");
091      argsList.add(backupInfo.getTableBackupDir(table));
092      if (backupInfo.getBandwidth() > -1) {
093        argsList.add("-bandwidth");
094        argsList.add(String.valueOf(backupInfo.getBandwidth()));
095      }
096      if (backupInfo.getWorkers() > -1) {
097        argsList.add("-mappers");
098        argsList.add(String.valueOf(backupInfo.getWorkers()));
099      }
100      if (backupInfo.getNoChecksumVerify()) {
101        argsList.add("-no-checksum-verify");
102      }
103
104      String[] args = argsList.toArray(new String[0]);
105
106      String jobname = "Full-Backup_" + backupInfo.getBackupId() + "_" + table.getNameAsString();
107      if (LOG.isDebugEnabled()) {
108        LOG.debug("Setting snapshot copy job name to : " + jobname);
109      }
110      conf.set(JOB_NAME_CONF_KEY, jobname);
111
112      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
113      res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
114
115      // if one snapshot export failed, do not continue for remained snapshots
116      if (res != 0) {
117        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
118
119        throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
120          + " with reason code " + res);
121      }
122
123      conf.unset(JOB_NAME_CONF_KEY);
124      LOG.info("Snapshot copy " + args[1] + " finished.");
125    }
126  }
127
128  /**
129   * Backup request execution.
130   * @throws IOException if the execution of the backup fails
131   */
132  @Override
133  public void execute() throws IOException {
134    try (Admin admin = conn.getAdmin()) {
135      // Begin BACKUP
136      beginBackup(backupManager, backupInfo);
137      String savedStartCode;
138      boolean firstBackup;
139      // do snapshot for full table backup
140
141      savedStartCode = backupManager.readBackupStartCode();
142      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
143      if (firstBackup) {
144        // This is our first backup. Let's put some marker to system table so that we can hold the
145        // logs while we do the backup.
146        backupManager.writeBackupStartCode(0L);
147      }
148      // We roll log here before we do the snapshot. It is possible there is duplicate data
149      // in the log that is already in the snapshot. But if we do it after the snapshot, we
150      // could have data loss.
151      // A better approach is to do the roll log on each RS in the same global procedure as
152      // the snapshot.
153      LOG.info("Execute roll log procedure for full backup ...");
154
155      Map<String, String> props = new HashMap<>();
156      props.put("backupRoot", backupInfo.getBackupRootDir());
157      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
158        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
159
160      newTimestamps = backupManager.readRegionServerLastLogRollResult();
161
162      // SNAPSHOT_TABLES:
163      backupInfo.setPhase(BackupPhase.SNAPSHOT);
164      for (TableName tableName : tableList) {
165        String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
166          + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
167
168        snapshotTable(admin, tableName, snapshotName);
169        backupInfo.setSnapshotName(tableName, snapshotName);
170      }
171
172      // SNAPSHOT_COPY:
173      // do snapshot copy
174      LOG.debug("snapshot copy for " + backupId);
175      snapshotCopy(backupInfo);
176      // Updates incremental backup table set
177      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
178
179      // BACKUP_COMPLETE:
180      // set overall backup status: complete. Here we make sure to complete the backup.
181      // After this checkpoint, even if entering cancel process, will let the backup finished
182      backupInfo.setState(BackupState.COMPLETE);
183      // The table list in backupInfo is good for both full backup and incremental backup.
184      // For incremental backup, it contains the incremental backup table set.
185      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
186
187      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
188        backupManager.readLogTimestampMap();
189
190      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
191      Long newStartCode =
192        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
193      backupManager.writeBackupStartCode(newStartCode);
194
195      // backup complete
196      completeBackup(conn, backupInfo, BackupType.FULL, conf);
197    } catch (Exception e) {
198      failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
199        BackupType.FULL, conf);
200      throw new IOException(e);
201    }
202  }
203
204  protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
205    throws IOException {
206    int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
207    int pause = conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
208    int attempts = 0;
209
210    while (attempts++ < maxAttempts) {
211      try {
212        admin.snapshot(snapshotName, tableName);
213        return;
214      } catch (IOException ee) {
215        LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName
216          + ", sleeping for " + pause + "ms", ee);
217        if (attempts < maxAttempts) {
218          try {
219            Thread.sleep(pause);
220          } catch (InterruptedException e) {
221            Thread.currentThread().interrupt();
222            break;
223          }
224        }
225      }
226    }
227    throw new IOException("Failed to snapshot table " + tableName);
228  }
229}