001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.TreeMap;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.BackupCopyJob;
035import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
036import org.apache.hadoop.hbase.backup.BackupRequest;
037import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
038import org.apache.hadoop.hbase.backup.BackupType;
039import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
040import org.apache.hadoop.hbase.backup.util.BackupUtils;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
044import org.apache.hadoop.hbase.mapreduce.WALPlayer;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.HFileArchiveUtil;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.util.Tool;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Incremental backup implementation. See the {@link #execute() execute} method.
057 */
058@InterfaceAudience.Private
059public class IncrementalTableBackupClient extends TableBackupClient {
060  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
061
062  protected IncrementalTableBackupClient() {
063  }
064
065  public IncrementalTableBackupClient(final Connection conn, final String backupId,
066    BackupRequest request) throws IOException {
067    super(conn, backupId, request);
068  }
069
070  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
071    List<String> list = new ArrayList<>();
072    for (String file : incrBackupFileList) {
073      Path p = new Path(file);
074      if (fs.exists(p) || isActiveWalPath(p)) {
075        list.add(file);
076      } else {
077        LOG.warn("Can't find file: " + file);
078      }
079    }
080    return list;
081  }
082
083  /**
084   * Check if a given path is belongs to active WAL directory
085   * @param p path
086   * @return true, if yes
087   */
088  protected boolean isActiveWalPath(Path p) {
089    return !AbstractFSWALProvider.isArchivedLogFile(p);
090  }
091
092  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
093    if (sTableList == null) {
094      return 0;
095    }
096
097    for (int i = 0; i < sTableList.size(); i++) {
098      if (tbl.equals(sTableList.get(i))) {
099        return i;
100      }
101    }
102    return -1;
103  }
104
105  /*
106   * Reads bulk load records from backup table, iterates through the records and forms the paths for
107   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
108   * @param sTableList list of tables to be backed up
109   * @return map of table to List of files
110   */
111  @SuppressWarnings("unchecked")
112  protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList)
113    throws IOException {
114    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
115    List<String> activeFiles = new ArrayList<>();
116    List<String> archiveFiles = new ArrayList<>();
117    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
118      backupManager.readBulkloadRows(sTableList);
119    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
120    FileSystem tgtFs;
121    try {
122      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
123    } catch (URISyntaxException use) {
124      throw new IOException("Unable to get FileSystem", use);
125    }
126    Path rootdir = CommonFSUtils.getRootDir(conf);
127    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
128
129    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map
130      .entrySet()) {
131      TableName srcTable = tblEntry.getKey();
132
133      int srcIdx = getIndex(srcTable, sTableList);
134      if (srcIdx < 0) {
135        LOG.warn("Couldn't find " + srcTable + " in source table List");
136        continue;
137      }
138      if (mapForSrc[srcIdx] == null) {
139        mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
140      }
141      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
142      Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
143        srcTable.getQualifierAsString());
144      for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry
145        .getValue().entrySet()) {
146        String regionName = regionEntry.getKey();
147        Path regionDir = new Path(tblDir, regionName);
148        // map from family to List of hfiles
149        for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue()
150          .entrySet()) {
151          String fam = famEntry.getKey();
152          Path famDir = new Path(regionDir, fam);
153          List<Path> files;
154          if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) {
155            files = new ArrayList<>();
156            mapForSrc[srcIdx].put(Bytes.toBytes(fam), files);
157          } else {
158            files = mapForSrc[srcIdx].get(Bytes.toBytes(fam));
159          }
160          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
161          String tblName = srcTable.getQualifierAsString();
162          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
163          if (!tgtFs.mkdirs(tgtFam)) {
164            throw new IOException("couldn't create " + tgtFam);
165          }
166          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
167            String file = fileWithState.getFirst();
168            int idx = file.lastIndexOf("/");
169            String filename = file;
170            if (idx > 0) {
171              filename = file.substring(idx + 1);
172            }
173            Path p = new Path(famDir, filename);
174            Path tgt = new Path(tgtFam, filename);
175            Path archive = new Path(archiveDir, filename);
176            if (fs.exists(p)) {
177              if (LOG.isTraceEnabled()) {
178                LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
179              }
180              if (LOG.isTraceEnabled()) {
181                LOG.trace("copying " + p + " to " + tgt);
182              }
183              activeFiles.add(p.toString());
184            } else if (fs.exists(archive)) {
185              LOG.debug("copying archive " + archive + " to " + tgt);
186              archiveFiles.add(archive.toString());
187            }
188            files.add(tgt);
189          }
190        }
191      }
192    }
193
194    copyBulkLoadedFiles(activeFiles, archiveFiles);
195    backupManager.deleteBulkLoadedRows(pair.getSecond());
196    return mapForSrc;
197  }
198
199  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
200    throws IOException {
201    try {
202      // Enable special mode of BackupDistCp
203      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
204      // Copy active files
205      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
206      int attempt = 1;
207      while (activeFiles.size() > 0) {
208        LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++);
209        String[] toCopy = new String[activeFiles.size()];
210        activeFiles.toArray(toCopy);
211        // Active file can be archived during copy operation,
212        // we need to handle this properly
213        try {
214          incrementalCopyHFiles(toCopy, tgtDest);
215          break;
216        } catch (IOException e) {
217          // Check if some files got archived
218          // Update active and archived lists
219          // When file is being moved from active to archive
220          // directory, the number of active files decreases
221          int numOfActive = activeFiles.size();
222          updateFileLists(activeFiles, archiveFiles);
223          if (activeFiles.size() < numOfActive) {
224            continue;
225          }
226          // if not - throw exception
227          throw e;
228        }
229      }
230      // If incremental copy will fail for archived files
231      // we will have partially loaded files in backup destination (only files from active data
232      // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
233      if (archiveFiles.size() > 0) {
234        String[] toCopy = new String[archiveFiles.size()];
235        archiveFiles.toArray(toCopy);
236        incrementalCopyHFiles(toCopy, tgtDest);
237      }
238    } finally {
239      // Disable special mode of BackupDistCp
240      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
241    }
242  }
243
244  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
245    throws IOException {
246    List<String> newlyArchived = new ArrayList<>();
247
248    for (String spath : activeFiles) {
249      if (!fs.exists(new Path(spath))) {
250        newlyArchived.add(spath);
251      }
252    }
253
254    if (newlyArchived.size() > 0) {
255      activeFiles.removeAll(newlyArchived);
256      archiveFiles.addAll(newlyArchived);
257    }
258
259    LOG.debug(newlyArchived.size() + " files have been archived.");
260  }
261
262  @Override
263  public void execute() throws IOException {
264    try {
265      // case PREPARE_INCREMENTAL:
266      beginBackup(backupManager, backupInfo);
267      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
268      LOG.debug("For incremental backup, current table set is "
269        + backupManager.getIncrementalBackupTableSet());
270      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
271    } catch (Exception e) {
272      // fail the overall backup and return
273      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
274        BackupType.INCREMENTAL, conf);
275      throw new IOException(e);
276    }
277
278    // case INCREMENTAL_COPY:
279    try {
280      // copy out the table and region info files for each table
281      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
282      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
283      convertWALsToHFiles();
284      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
285        backupInfo.getBackupRootDir());
286    } catch (Exception e) {
287      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
288      // fail the overall backup and return
289      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
290      throw new IOException(e);
291    }
292    // case INCR_BACKUP_COMPLETE:
293    // set overall backup status: complete. Here we make sure to complete the backup.
294    // After this checkpoint, even if entering cancel process, will let the backup finished
295    try {
296      // Set the previousTimestampMap which is before this current log roll to the manifest.
297      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
298      backupInfo.setIncrTimestampMap(previousTimestampMap);
299
300      // The table list in backupInfo is good for both full backup and incremental backup.
301      // For incremental backup, it contains the incremental backup table set.
302      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
303
304      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
305        backupManager.readLogTimestampMap();
306
307      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
308      Long newStartCode =
309        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
310      backupManager.writeBackupStartCode(newStartCode);
311
312      handleBulkLoad(backupInfo.getTableNames());
313      // backup complete
314      completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
315
316    } catch (IOException e) {
317      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
318        BackupType.INCREMENTAL, conf);
319      throw new IOException(e);
320    }
321  }
322
323  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
324    try {
325      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
326      // set overall backup phase: incremental_copy
327      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
328      // get incremental backup file list and prepare parms for DistCp
329      String[] strArr = new String[files.length + 1];
330      System.arraycopy(files, 0, strArr, 0, files.length);
331      strArr[strArr.length - 1] = backupDest;
332
333      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
334      if (LOG.isDebugEnabled()) {
335        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
336      }
337      conf.set(JOB_NAME_CONF_KEY, jobname);
338
339      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
340      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
341      if (res != 0) {
342        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
343        throw new IOException(
344          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
345      }
346      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
347        + " finished.");
348    } finally {
349      deleteBulkLoadDirectory();
350    }
351  }
352
353  protected void deleteBulkLoadDirectory() throws IOException {
354    // delete original bulk load directory on method exit
355    Path path = getBulkOutputDir();
356    FileSystem fs = FileSystem.get(path.toUri(), conf);
357    boolean result = fs.delete(path, true);
358    if (!result) {
359      LOG.warn("Could not delete " + path);
360    }
361  }
362
363  protected void convertWALsToHFiles() throws IOException {
364    // get incremental backup file list and prepare parameters for DistCp
365    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
366    // Get list of tables in incremental backup set
367    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
368    // filter missing files out (they have been copied by previous backups)
369    incrBackupFileList = filterMissingFiles(incrBackupFileList);
370    List<String> tableList = new ArrayList<String>();
371    for (TableName table : tableSet) {
372      // Check if table exists
373      if (tableExists(table, conn)) {
374        tableList.add(table.getNameAsString());
375      } else {
376        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
377      }
378    }
379    walToHFiles(incrBackupFileList, tableList);
380
381  }
382
383  protected boolean tableExists(TableName table, Connection conn) throws IOException {
384    try (Admin admin = conn.getAdmin()) {
385      return admin.tableExists(table);
386    }
387  }
388
389  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
390    Tool player = new WALPlayer();
391
392    // Player reads all files in arbitrary directory structure and creates
393    // a Map task for each file. We use ';' as separator
394    // because WAL file names contains ','
395    String dirs = StringUtils.join(dirPaths, ';');
396    String jobname = "Incremental_Backup-" + backupId;
397
398    Path bulkOutputPath = getBulkOutputDir();
399    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
400    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
401    conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
402    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
403    conf.set(JOB_NAME_CONF_KEY, jobname);
404    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
405
406    try {
407      player.setConf(conf);
408      int result = player.run(playerArgs);
409      if (result != 0) {
410        throw new IOException("WAL Player failed");
411      }
412      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
413      conf.unset(JOB_NAME_CONF_KEY);
414    } catch (IOException e) {
415      throw e;
416    } catch (Exception ee) {
417      throw new IOException("Can not convert from directory " + dirs
418        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
419    }
420  }
421
422  protected Path getBulkOutputDirForTable(TableName table) {
423    Path tablePath = getBulkOutputDir();
424    tablePath = new Path(tablePath, table.getNamespaceAsString());
425    tablePath = new Path(tablePath, table.getQualifierAsString());
426    return new Path(tablePath, "data");
427  }
428
429  protected Path getBulkOutputDir() {
430    String backupId = backupInfo.getBackupId();
431    Path path = new Path(backupInfo.getBackupRootDir());
432    path = new Path(path, ".tmp");
433    path = new Path(path, backupId);
434    return path;
435  }
436}