001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
021import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
022import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
023import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
024import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.BackupCopyJob;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
035import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
036import org.apache.hadoop.hbase.backup.BackupRequest;
037import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
038import org.apache.hadoop.hbase.backup.BackupType;
039import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
040import org.apache.hadoop.hbase.backup.util.BackupUtils;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Full table backup implementation
050 */
051@InterfaceAudience.Private
052public class FullTableBackupClient extends TableBackupClient {
053  private static final Logger LOG = LoggerFactory.getLogger(FullTableBackupClient.class);
054
055  public FullTableBackupClient() {
056  }
057
058  public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
059    throws IOException {
060    super(conn, backupId, request);
061  }
062
063  /**
064   * Do snapshot copy.
065   * @param backupInfo backup info
066   * @throws Exception exception
067   */
068  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
069    LOG.info("Snapshot copy is starting.");
070
071    // set overall backup phase: snapshot_copy
072    backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY);
073
074    // call ExportSnapshot to copy files based on hbase snapshot for backup
075    // ExportSnapshot only support single snapshot export, need loop for multiple tables case
076    BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
077
078    // number of snapshots matches number of tables
079    float numOfSnapshots = backupInfo.getSnapshotNames().size();
080
081    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
082
083    for (TableName table : backupInfo.getTables()) {
084      // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
085      // calculate the real files' size for the percentage in the future.
086      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
087      int res;
088      ArrayList<String> argsList = new ArrayList<>();
089      argsList.add("-snapshot");
090      argsList.add(backupInfo.getSnapshotName(table));
091      argsList.add("-copy-to");
092      argsList.add(backupInfo.getTableBackupDir(table));
093      if (backupInfo.getBandwidth() > -1) {
094        argsList.add("-bandwidth");
095        argsList.add(String.valueOf(backupInfo.getBandwidth()));
096      }
097      if (backupInfo.getWorkers() > -1) {
098        argsList.add("-mappers");
099        argsList.add(String.valueOf(backupInfo.getWorkers()));
100      }
101      if (backupInfo.getNoChecksumVerify()) {
102        argsList.add("-no-checksum-verify");
103      }
104
105      String[] args = argsList.toArray(new String[0]);
106
107      String jobname = "Full-Backup_" + backupInfo.getBackupId() + "_" + table.getNameAsString();
108      if (LOG.isDebugEnabled()) {
109        LOG.debug("Setting snapshot copy job name to : " + jobname);
110      }
111      conf.set(JOB_NAME_CONF_KEY, jobname);
112
113      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
114      res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args);
115
116      // if one snapshot export failed, do not continue for remained snapshots
117      if (res != 0) {
118        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
119
120        throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
121          + " with reason code " + res);
122      }
123
124      conf.unset(JOB_NAME_CONF_KEY);
125      LOG.info("Snapshot copy " + args[1] + " finished.");
126    }
127  }
128
129  /**
130   * Backup request execution.
131   * @throws IOException if the execution of the backup fails
132   */
133  @Override
134  public void execute() throws IOException {
135    try (Admin admin = conn.getAdmin()) {
136      // Begin BACKUP
137      beginBackup(backupManager, backupInfo);
138      String savedStartCode;
139      boolean firstBackup;
140      // do snapshot for full table backup
141
142      savedStartCode = backupManager.readBackupStartCode();
143      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
144      if (firstBackup) {
145        // This is our first backup. Let's put some marker to system table so that we can hold the
146        // logs while we do the backup.
147        backupManager.writeBackupStartCode(0L);
148      }
149      // We roll log here before we do the snapshot. It is possible there is duplicate data
150      // in the log that is already in the snapshot. But if we do it after the snapshot, we
151      // could have data loss.
152      // A better approach is to do the roll log on each RS in the same global procedure as
153      // the snapshot.
154      LOG.info("Execute roll log procedure for full backup ...");
155
156      // Gather the bulk loads being tracked by the system, which can be deleted (since their data
157      // will be part of the snapshot being taken). We gather this list before taking the actual
158      // snapshots for the same reason as the log rolls.
159      List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
160
161      Map<String, String> props = new HashMap<>();
162      props.put("backupRoot", backupInfo.getBackupRootDir());
163      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
164        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
165
166      newTimestamps = backupManager.readRegionServerLastLogRollResult();
167
168      // SNAPSHOT_TABLES:
169      backupInfo.setPhase(BackupPhase.SNAPSHOT);
170      for (TableName tableName : tableList) {
171        String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
172          + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
173
174        snapshotTable(admin, tableName, snapshotName);
175        backupInfo.setSnapshotName(tableName, snapshotName);
176      }
177
178      // SNAPSHOT_COPY:
179      // do snapshot copy
180      LOG.debug("snapshot copy for " + backupId);
181      snapshotCopy(backupInfo);
182      // Updates incremental backup table set
183      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
184
185      // BACKUP_COMPLETE:
186      // set overall backup status: complete. Here we make sure to complete the backup.
187      // After this checkpoint, even if entering cancel process, will let the backup finished
188      backupInfo.setState(BackupState.COMPLETE);
189      // The table list in backupInfo is good for both full backup and incremental backup.
190      // For incremental backup, it contains the incremental backup table set.
191      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
192
193      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
194        backupManager.readLogTimestampMap();
195
196      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
197      Long newStartCode =
198        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
199      backupManager.writeBackupStartCode(newStartCode);
200
201      backupManager
202        .deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());
203
204      // backup complete
205      completeBackup(conn, backupInfo, BackupType.FULL, conf);
206    } catch (Exception e) {
207      failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
208        BackupType.FULL, conf);
209      throw new IOException(e);
210    }
211  }
212
213  protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
214    throws IOException {
215    int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, DEFAULT_BACKUP_MAX_ATTEMPTS);
216    int pause = conf.getInt(BACKUP_ATTEMPTS_PAUSE_MS_KEY, DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS);
217    int attempts = 0;
218
219    while (attempts++ < maxAttempts) {
220      try {
221        admin.snapshot(snapshotName, tableName);
222        return;
223      } catch (IOException ee) {
224        LOG.warn("Snapshot attempt " + attempts + " failed for table " + tableName
225          + ", sleeping for " + pause + "ms", ee);
226        if (attempts < maxAttempts) {
227          try {
228            Thread.sleep(pause);
229          } catch (InterruptedException e) {
230            Thread.currentThread().interrupt();
231            break;
232          }
233        }
234      }
235    }
236    throw new IOException("Failed to snapshot table " + tableName);
237  }
238}