001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayDeque;
026import java.util.ArrayList;
027import java.util.Deque;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import org.apache.commons.io.IOUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.RemoteIterator;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.BackupInfo;
041import org.apache.hadoop.hbase.backup.BackupMergeJob;
042import org.apache.hadoop.hbase.backup.HBackupFileSystem;
043import org.apache.hadoop.hbase.backup.impl.BackupManifest;
044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
045import org.apache.hadoop.hbase.backup.util.BackupUtils;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
049import org.apache.hadoop.hbase.util.FSTableDescriptors;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.hadoop.util.Tool;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a
058 * backup destination cluster
059 */
060@InterfaceAudience.Private
061public class MapReduceBackupMergeJob implements BackupMergeJob {
062  public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class);
063
064  protected Tool player;
065  protected Configuration conf;
066
067  public MapReduceBackupMergeJob() {
068  }
069
070  @Override
071  public Configuration getConf() {
072    return conf;
073  }
074
075  @Override
076  public void setConf(Configuration conf) {
077    this.conf = conf;
078  }
079
080  @Override
081  public void run(String[] backupIds) throws IOException {
082    String bulkOutputConfKey;
083
084    // TODO : run player on remote cluster
085    player = new MapReduceHFileSplitterJob();
086    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
087    // Player reads all files in arbitrary directory structure and creates
088    // a Map task for each file
089    String bids = StringUtils.join(backupIds, ",");
090
091    if (LOG.isDebugEnabled()) {
092      LOG.debug("Merge backup images " + bids);
093    }
094
095    List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
096    boolean finishedTables = false;
097    Connection conn = ConnectionFactory.createConnection(getConf());
098    BackupSystemTable table = new BackupSystemTable(conn);
099    FileSystem fs = null;
100
101    try {
102
103      // Get exclusive lock on backup system
104      table.startBackupExclusiveOperation();
105      // Start merge operation
106      table.startMergeOperation(backupIds);
107
108      // Select most recent backup id
109      String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
110
111      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
112
113      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
114      String backupRoot = bInfo.getBackupRootDir();
115      Path backupRootPath = new Path(backupRoot);
116      fs = backupRootPath.getFileSystem(conf);
117
118      for (int i = 0; i < tableNames.length; i++) {
119        LOG.info("Merge backup images for " + tableNames[i]);
120
121        // Find input directories for table
122        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
123        String dirs = StringUtils.join(dirPaths, ",");
124
125        // bulkOutputPath should be on the same filesystem as backupRoot
126        Path tmpRestoreOutputDir = HBackupFileSystem.getBackupTmpDirPath(backupRoot);
127        Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpRestoreOutputDir,
128          BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
129        // Delete content if exists
130        if (fs.exists(bulkOutputPath)) {
131          if (!fs.delete(bulkOutputPath, true)) {
132            LOG.warn("Can not delete: " + bulkOutputPath);
133          }
134        }
135        Configuration conf = getConf();
136        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
137        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
138
139        player.setConf(getConf());
140        int result = player.run(playerArgs);
141        if (!succeeded(result)) {
142          throw new IOException("Can not merge backup images for " + dirs
143            + " (check Hadoop/MR and HBase logs). Player return code =" + result);
144        }
145        // Add to processed table list
146        processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
147        LOG.debug("Merge Job finished:" + result);
148      }
149      List<TableName> tableList = toTableNameList(processedTableList);
150      table.updateProcessedTablesForMerge(tableList);
151      finishedTables = true;
152
153      // PHASE 2 (modification of a backup file system)
154      // Move existing mergedBackupId data into tmp directory
155      // we will need it later in case of a failure
156      Path tmpBackupDir =
157        HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
158      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
159
160      if (!fs.rename(backupDirPath, tmpBackupDir)) {
161        throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
162      } else {
163        LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
164      }
165      // Move new data into backup dest
166      for (Pair<TableName, Path> tn : processedTableList) {
167        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
168      }
169      // Update backup manifest
170      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
171      updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
172      // Copy meta files back from tmp to backup dir
173      copyMetaData(fs, tmpBackupDir, backupDirPath);
174      // Delete tmp dir (Rename back during repair)
175      if (!fs.delete(tmpBackupDir, true)) {
176        // WARN and ignore
177        LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
178      }
179      // Delete old data
180      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
181      // Finish merge session
182      table.finishMergeOperation();
183      // Release lock
184      table.finishBackupExclusiveOperation();
185    } catch (RuntimeException e) {
186
187      throw e;
188    } catch (Exception e) {
189      LOG.error(e.toString(), e);
190      if (!finishedTables) {
191        // cleanup bulk directories and finish merge
192        // merge MUST be repeated (no need for repair)
193        if (fs != null) {
194          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
195        }
196        table.finishMergeOperation();
197        table.finishBackupExclusiveOperation();
198        throw new IOException("Backup merge operation failed, you should try it again", e);
199      } else {
200        // backup repair must be run
201        throw new IOException(
202          "Backup merge operation failed, run backup repair tool to restore system's integrity", e);
203      }
204    } finally {
205      table.close();
206      conn.close();
207    }
208  }
209
210  /**
211   * Copy meta data to of a backup session
212   * @param fs            file system
213   * @param tmpBackupDir  temp backup directory, where meta is locaed
214   * @param backupDirPath new path for backup
215   * @throws IOException exception
216   */
217  protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
218    throws IOException {
219    RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
220    List<Path> toKeep = new ArrayList<Path>();
221    while (it.hasNext()) {
222      Path p = it.next().getPath();
223      if (fs.isDirectory(p)) {
224        continue;
225      }
226      // Keep meta
227      String fileName = p.toString();
228      if (
229        fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
230          || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0
231          || fileName.indexOf(BackupManifest.MANIFEST_FILE_NAME) > 0
232      ) {
233        toKeep.add(p);
234      }
235    }
236    // Copy meta to destination
237    for (Path p : toKeep) {
238      Path newPath = convertToDest(p, backupDirPath);
239      LOG.info("Copying tmp metadata from {} to {}", p, newPath);
240      copyFile(fs, p, newPath);
241    }
242  }
243
244  /**
245   * Copy file in DFS from p to newPath
246   * @param fs      file system
247   * @param p       old path
248   * @param newPath new path
249   * @throws IOException exception
250   */
251  protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
252    try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) {
253      IOUtils.copy(in, out);
254    }
255    boolean exists = fs.exists(newPath);
256    if (!exists) {
257      throw new IOException("Failed to copy meta file to: " + newPath);
258    }
259  }
260
261  /**
262   * Converts path before copying
263   * @param p             path
264   * @param backupDirPath backup root
265   * @return converted path
266   */
267  protected Path convertToDest(Path p, Path backupDirPath) {
268    String backupId = backupDirPath.getName();
269    Deque<String> stack = new ArrayDeque<String>();
270    String name = null;
271    while (true) {
272      name = p.getName();
273      if (!name.equals(backupId)) {
274        stack.push(name);
275        p = p.getParent();
276      } else {
277        break;
278      }
279    }
280    Path newPath = new Path(backupDirPath.toString());
281    while (!stack.isEmpty()) {
282      newPath = new Path(newPath, stack.pop());
283    }
284    return newPath;
285  }
286
287  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
288    ArrayList<Path> list = new ArrayList<>();
289    for (Pair<TableName, Path> p : processedTableList) {
290      list.add(p.getSecond());
291    }
292    return list;
293  }
294
295  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
296    ArrayList<TableName> list = new ArrayList<>();
297    for (Pair<TableName, Path> p : processedTableList) {
298      list.add(p.getFirst());
299    }
300    return list;
301  }
302
303  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
304    for (Path path : pathList) {
305      if (!fs.delete(path, true)) {
306        LOG.warn("Can't delete " + path);
307      }
308    }
309  }
310
311  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
312    List<String> backupsToDelete) throws IllegalArgumentException, IOException {
313    BackupManifest manifest =
314      HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
315    LOG.info("Removing ancestors from merged backup {} : {}", mergedBackupId, backupsToDelete);
316    manifest.getBackupImage().removeAncestors(backupsToDelete);
317    // save back
318    LOG.info("Creating new manifest file for merged backup {} at root {}", mergedBackupId,
319      backupRoot);
320    manifest.store(conf);
321  }
322
323  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
324    String backupRoot) throws IOException {
325    // Delete from backup system table
326    try (BackupSystemTable table = new BackupSystemTable(conn)) {
327      for (String backupId : backupIds) {
328        LOG.info("Removing metadata for backup {}", backupId);
329        table.deleteBackupInfo(backupId);
330      }
331    }
332
333    // Delete from file system
334    for (String backupId : backupIds) {
335      LOG.info("Purging backup {} from FileSystem", backupId);
336      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
337
338      if (!fs.delete(backupDirPath, true)) {
339        LOG.warn("Could not delete " + backupDirPath);
340      }
341    }
342  }
343
344  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
345    List<String> list = new ArrayList<>();
346    for (String id : backupIds) {
347      if (id.equals(mergedBackupId)) {
348        continue;
349      }
350      list.add(id);
351    }
352    return list;
353  }
354
355  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
356    TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException {
357    Path dest =
358      new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
359
360    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
361    for (FileStatus fst : fsts) {
362      if (fst.isDirectory()) {
363        String family = fst.getPath().getName();
364        Path newDst = new Path(dest, family);
365        if (fs.exists(newDst)) {
366          if (!fs.delete(newDst, true)) {
367            throw new IOException("failed to delete :" + newDst);
368          }
369        } else {
370          fs.mkdirs(dest);
371        }
372        boolean result = fs.rename(fst.getPath(), dest);
373        LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result);
374      }
375    }
376  }
377
378  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
379    Set<TableName> allSet = new HashSet<>();
380
381    try (Connection conn = ConnectionFactory.createConnection(conf);
382      BackupSystemTable table = new BackupSystemTable(conn)) {
383      for (String backupId : backupIds) {
384        BackupInfo bInfo = table.readBackupInfo(backupId);
385
386        allSet.addAll(bInfo.getTableNames());
387      }
388    }
389
390    TableName[] ret = new TableName[allSet.size()];
391    return allSet.toArray(ret);
392  }
393
394  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
395    String[] backupIds) throws IOException {
396    List<Path> dirs = new ArrayList<>();
397
398    for (String backupId : backupIds) {
399      Path fileBackupDirPath =
400        new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName));
401      if (fs.exists(fileBackupDirPath)) {
402        dirs.add(fileBackupDirPath);
403      } else {
404        if (LOG.isDebugEnabled()) {
405          LOG.debug("File: " + fileBackupDirPath + " does not exist.");
406        }
407      }
408    }
409    Path[] ret = new Path[dirs.size()];
410    return dirs.toArray(ret);
411  }
412}