001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import org.apache.commons.io.FilenameUtils;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.backup.BackupCopyJob;
036import org.apache.hadoop.hbase.backup.BackupInfo;
037import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
038import org.apache.hadoop.hbase.backup.BackupRequest;
039import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
040import org.apache.hadoop.hbase.backup.BackupType;
041import org.apache.hadoop.hbase.backup.HBackupFileSystem;
042import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
043import org.apache.hadoop.hbase.backup.util.BackupUtils;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.mapreduce.WALPlayer;
048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
049import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.HFileArchiveUtil;
052import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
053import org.apache.hadoop.util.Tool;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
061
062/**
063 * Incremental backup implementation. See the {@link #execute() execute} method.
064 */
065@InterfaceAudience.Private
066public class IncrementalTableBackupClient extends TableBackupClient {
067  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
068
069  protected IncrementalTableBackupClient() {
070  }
071
072  public IncrementalTableBackupClient(final Connection conn, final String backupId,
073    BackupRequest request) throws IOException {
074    super(conn, backupId, request);
075  }
076
077  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
078    List<String> list = new ArrayList<>();
079    for (String file : incrBackupFileList) {
080      Path p = new Path(file);
081      if (fs.exists(p) || isActiveWalPath(p)) {
082        list.add(file);
083      } else {
084        LOG.warn("Can't find file: " + file);
085      }
086    }
087    return list;
088  }
089
090  /**
091   * Check if a given path is belongs to active WAL directory
092   * @param p path
093   * @return true, if yes
094   */
095  protected boolean isActiveWalPath(Path p) {
096    return !AbstractFSWALProvider.isArchivedLogFile(p);
097  }
098
099  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
100    if (sTableList == null) {
101      return 0;
102    }
103
104    for (int i = 0; i < sTableList.size(); i++) {
105      if (tbl.equals(sTableList.get(i))) {
106        return i;
107      }
108    }
109    return -1;
110  }
111
112  /**
113   * Reads bulk load records from backup table, iterates through the records and forms the paths for
114   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT
115   * clean up the entries in the bulk load system table. Those entries should not be cleaned until
116   * the backup is marked as complete.
117   * @param tablesToBackup list of tables to be backed up
118   */
119  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
120    List<String> activeFiles = new ArrayList<>();
121    List<String> archiveFiles = new ArrayList<>();
122    List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
123    FileSystem tgtFs;
124    try {
125      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
126    } catch (URISyntaxException use) {
127      throw new IOException("Unable to get FileSystem", use);
128    }
129    Path rootdir = CommonFSUtils.getRootDir(conf);
130    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
131
132    for (BulkLoad bulkLoad : bulkLoads) {
133      TableName srcTable = bulkLoad.getTableName();
134      String regionName = bulkLoad.getRegion();
135      String fam = bulkLoad.getColumnFamily();
136      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
137
138      if (!tablesToBackup.contains(srcTable)) {
139        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
140        continue;
141      }
142      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
143      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename);
144
145      String srcTableQualifier = srcTable.getQualifierAsString();
146      String srcTableNs = srcTable.getNamespaceAsString();
147      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier
148        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
149      if (!tgtFs.mkdirs(tgtFam)) {
150        throw new IOException("couldn't create " + tgtFam);
151      }
152      Path tgt = new Path(tgtFam, filename);
153
154      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
155      Path archive = new Path(archiveDir, filename);
156
157      if (fs.exists(p)) {
158        if (LOG.isTraceEnabled()) {
159          LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(),
160            srcTableQualifier);
161          LOG.trace("copying {} to {}", p, tgt);
162        }
163        activeFiles.add(p.toString());
164      } else if (fs.exists(archive)) {
165        LOG.debug("copying archive {} to {}", archive, tgt);
166        archiveFiles.add(archive.toString());
167      }
168    }
169
170    copyBulkLoadedFiles(activeFiles, archiveFiles);
171    return bulkLoads;
172  }
173
174  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
175    throws IOException {
176    try {
177      // Enable special mode of BackupDistCp
178      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
179      // Copy active files
180      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
181      int attempt = 1;
182      while (activeFiles.size() > 0) {
183        LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++);
184        String[] toCopy = new String[activeFiles.size()];
185        activeFiles.toArray(toCopy);
186        // Active file can be archived during copy operation,
187        // we need to handle this properly
188        try {
189          incrementalCopyHFiles(toCopy, tgtDest);
190          break;
191        } catch (IOException e) {
192          // Check if some files got archived
193          // Update active and archived lists
194          // When file is being moved from active to archive
195          // directory, the number of active files decreases
196          int numOfActive = activeFiles.size();
197          updateFileLists(activeFiles, archiveFiles);
198          if (activeFiles.size() < numOfActive) {
199            continue;
200          }
201          // if not - throw exception
202          throw e;
203        }
204      }
205      // If incremental copy will fail for archived files
206      // we will have partially loaded files in backup destination (only files from active data
207      // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
208      if (archiveFiles.size() > 0) {
209        String[] toCopy = new String[archiveFiles.size()];
210        archiveFiles.toArray(toCopy);
211        incrementalCopyHFiles(toCopy, tgtDest);
212      }
213    } finally {
214      // Disable special mode of BackupDistCp
215      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
216    }
217  }
218
219  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
220    throws IOException {
221    List<String> newlyArchived = new ArrayList<>();
222
223    for (String spath : activeFiles) {
224      if (!fs.exists(new Path(spath))) {
225        newlyArchived.add(spath);
226      }
227    }
228
229    if (newlyArchived.size() > 0) {
230      activeFiles.removeAll(newlyArchived);
231      archiveFiles.addAll(newlyArchived);
232    }
233
234    LOG.debug(newlyArchived.size() + " files have been archived.");
235  }
236
237  /**
238   * @throws IOException                   If the execution of the backup fails
239   * @throws ColumnFamilyMismatchException If the column families of the current table do not match
240   *                                       the column families for the last full backup. In which
241   *                                       case, a full backup should be taken
242   */
243  @Override
244  public void execute() throws IOException, ColumnFamilyMismatchException {
245    try {
246      Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
247      verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
248
249      // case PREPARE_INCREMENTAL:
250      beginBackup(backupManager, backupInfo);
251      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
252      LOG.debug("For incremental backup, current table set is "
253        + backupManager.getIncrementalBackupTableSet());
254      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
255    } catch (Exception e) {
256      // fail the overall backup and return
257      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
258        BackupType.INCREMENTAL, conf);
259      throw new IOException(e);
260    }
261
262    // case INCREMENTAL_COPY:
263    try {
264      // copy out the table and region info files for each table
265      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
266      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
267      convertWALsToHFiles();
268      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
269        backupInfo.getBackupRootDir());
270    } catch (Exception e) {
271      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
272      // fail the overall backup and return
273      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
274      throw new IOException(e);
275    }
276    // case INCR_BACKUP_COMPLETE:
277    // set overall backup status: complete. Here we make sure to complete the backup.
278    // After this checkpoint, even if entering cancel process, will let the backup finished
279    try {
280      // Set the previousTimestampMap which is before this current log roll to the manifest.
281      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
282      backupInfo.setIncrTimestampMap(previousTimestampMap);
283
284      // The table list in backupInfo is good for both full backup and incremental backup.
285      // For incremental backup, it contains the incremental backup table set.
286      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
287
288      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
289        backupManager.readLogTimestampMap();
290
291      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
292      Long newStartCode =
293        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
294      backupManager.writeBackupStartCode(newStartCode);
295
296      List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
297
298      // backup complete
299      completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
300
301      List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey);
302      backupManager.deleteBulkLoadedRows(bulkLoadedRows);
303    } catch (IOException e) {
304      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
305        BackupType.INCREMENTAL, conf);
306      throw new IOException(e);
307    }
308  }
309
310  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
311    try {
312      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
313      // set overall backup phase: incremental_copy
314      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
315      // get incremental backup file list and prepare parms for DistCp
316      String[] strArr = new String[files.length + 1];
317      System.arraycopy(files, 0, strArr, 0, files.length);
318      strArr[strArr.length - 1] = backupDest;
319
320      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
321      if (LOG.isDebugEnabled()) {
322        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
323      }
324      conf.set(JOB_NAME_CONF_KEY, jobname);
325
326      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
327      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
328      if (res != 0) {
329        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
330        throw new IOException(
331          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
332      }
333      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
334        + " finished.");
335    } finally {
336      deleteBulkLoadDirectory();
337    }
338  }
339
340  protected void deleteBulkLoadDirectory() throws IOException {
341    // delete original bulk load directory on method exit
342    Path path = getBulkOutputDir();
343    FileSystem fs = FileSystem.get(path.toUri(), conf);
344    boolean result = fs.delete(path, true);
345    if (!result) {
346      LOG.warn("Could not delete " + path);
347    }
348  }
349
350  protected void convertWALsToHFiles() throws IOException {
351    // get incremental backup file list and prepare parameters for DistCp
352    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
353    // Get list of tables in incremental backup set
354    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
355    // filter missing files out (they have been copied by previous backups)
356    incrBackupFileList = filterMissingFiles(incrBackupFileList);
357    List<String> tableList = new ArrayList<String>();
358    for (TableName table : tableSet) {
359      // Check if table exists
360      if (tableExists(table, conn)) {
361        tableList.add(table.getNameAsString());
362      } else {
363        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
364      }
365    }
366    walToHFiles(incrBackupFileList, tableList);
367
368  }
369
370  protected boolean tableExists(TableName table, Connection conn) throws IOException {
371    try (Admin admin = conn.getAdmin()) {
372      return admin.tableExists(table);
373    }
374  }
375
376  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
377    Tool player = new WALPlayer();
378
379    // Player reads all files in arbitrary directory structure and creates
380    // a Map task for each file. We use ';' as separator
381    // because WAL file names contains ','
382    String dirs = StringUtils.join(dirPaths, ';');
383    String jobname = "Incremental_Backup-" + backupId;
384
385    Path bulkOutputPath = getBulkOutputDir();
386    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
387    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
388    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
389    conf.set(JOB_NAME_CONF_KEY, jobname);
390    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
391
392    try {
393      player.setConf(conf);
394      int result = player.run(playerArgs);
395      if (result != 0) {
396        throw new IOException("WAL Player failed");
397      }
398      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
399      conf.unset(JOB_NAME_CONF_KEY);
400    } catch (IOException e) {
401      throw e;
402    } catch (Exception ee) {
403      throw new IOException("Can not convert from directory " + dirs
404        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
405    }
406  }
407
408  protected Path getBulkOutputDirForTable(TableName table) {
409    Path tablePath = getBulkOutputDir();
410    tablePath = new Path(tablePath, table.getNamespaceAsString());
411    tablePath = new Path(tablePath, table.getQualifierAsString());
412    return new Path(tablePath, "data");
413  }
414
415  protected Path getBulkOutputDir() {
416    String backupId = backupInfo.getBackupId();
417    Path path = new Path(backupInfo.getBackupRootDir());
418    path = new Path(path, ".tmp");
419    path = new Path(path, backupId);
420    return path;
421  }
422
423  private Map<TableName, String> getFullBackupIds() throws IOException {
424    // Ancestors are stored from newest to oldest, so we can iterate backwards
425    // in order to populate our backupId map with the most recent full backup
426    // for a given table
427    List<BackupManifest.BackupImage> images = getAncestors(backupInfo);
428    Map<TableName, String> results = new HashMap<>();
429    for (int i = images.size() - 1; i >= 0; i--) {
430      BackupManifest.BackupImage image = images.get(i);
431      if (image.getType() != BackupType.FULL) {
432        continue;
433      }
434
435      for (TableName tn : image.getTableNames()) {
436        results.put(tn, image.getBackupId());
437      }
438    }
439    return results;
440  }
441
442  /**
443   * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full
444   * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch
445   * is detected, a full table backup should be taken, rather than an incremental one
446   */
447  private void verifyCfCompatibility(Set<TableName> tables,
448    Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException {
449    ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder =
450      ColumnFamilyMismatchException.newBuilder();
451    try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
452      for (TableName tn : tables) {
453        String backupId = tablesToFullBackupId.get(tn);
454        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId);
455
456        ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies();
457        String snapshotName = fullBackupInfo.getSnapshotName(tn);
458        Path root = HBackupFileSystem.getTableBackupPath(tn,
459          new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId());
460        Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root);
461
462        FileSystem fs;
463        try {
464          fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf);
465        } catch (URISyntaxException e) {
466          throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e);
467        }
468
469        SnapshotProtos.SnapshotDescription snapshotDescription =
470          SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir);
471        SnapshotManifest manifest =
472          SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription);
473
474        ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies();
475        if (!areCfsCompatible(currentCfs, backupCfs)) {
476          exBuilder.addMismatchedTable(tn, currentCfs, backupCfs);
477        }
478      }
479    }
480
481    ColumnFamilyMismatchException ex = exBuilder.build();
482    if (!ex.getMismatchedTables().isEmpty()) {
483      throw ex;
484    }
485  }
486
487  private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs,
488    ColumnFamilyDescriptor[] backupCfs) {
489    if (currentCfs.length != backupCfs.length) {
490      return false;
491    }
492
493    for (int i = 0; i < backupCfs.length; i++) {
494      String currentCf = currentCfs[i].getNameAsString();
495      String backupCf = backupCfs[i].getNameAsString();
496
497      if (!currentCf.equals(backupCf)) {
498        return false;
499      }
500    }
501
502    return true;
503  }
504}