001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayDeque; 026import java.util.ArrayList; 027import java.util.Deque; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import org.apache.commons.io.IOUtils; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.RemoteIterator; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.backup.BackupInfo; 041import org.apache.hadoop.hbase.backup.BackupMergeJob; 042import org.apache.hadoop.hbase.backup.HBackupFileSystem; 043import org.apache.hadoop.hbase.backup.impl.BackupManifest; 044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 045import org.apache.hadoop.hbase.backup.util.BackupUtils; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 049import org.apache.hadoop.hbase.util.FSTableDescriptors; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.hadoop.util.Tool; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a 058 * backup destination cluster 059 */ 060@InterfaceAudience.Private 061public class MapReduceBackupMergeJob implements BackupMergeJob { 062 public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class); 063 064 protected Tool player; 065 protected Configuration conf; 066 067 public MapReduceBackupMergeJob() { 068 } 069 070 @Override 071 public Configuration getConf() { 072 return conf; 073 } 074 075 @Override 076 public void setConf(Configuration conf) { 077 this.conf = conf; 078 } 079 080 @Override 081 public void run(String[] backupIds) throws IOException { 082 String bulkOutputConfKey; 083 084 // TODO : run player on remote cluster 085 player = new MapReduceHFileSplitterJob(); 086 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 087 // Player reads all files in arbitrary directory structure and creates 088 // a Map task for each file 089 String bids = StringUtils.join(backupIds, ","); 090 091 if (LOG.isDebugEnabled()) { 092 LOG.debug("Merge backup images " + bids); 093 } 094 095 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 096 boolean finishedTables = false; 097 Connection conn = ConnectionFactory.createConnection(getConf()); 098 BackupSystemTable table = new BackupSystemTable(conn); 099 FileSystem fs = null; 100 101 try { 102 103 // Get exclusive lock on backup system 104 table.startBackupExclusiveOperation(); 105 // Start merge operation 106 table.startMergeOperation(backupIds); 107 108 // Select most recent backup id 109 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 110 111 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 112 113 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 114 String backupRoot = bInfo.getBackupRootDir(); 115 Path backupRootPath = new Path(backupRoot); 116 fs = backupRootPath.getFileSystem(conf); 117 118 for (int i = 0; i < tableNames.length; i++) { 119 LOG.info("Merge backup images for " + tableNames[i]); 120 121 // Find input directories for table 122 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 123 String dirs = StringUtils.join(dirPaths, ","); 124 125 // bulkOutputPath should be on the same filesystem as backupRoot 126 Path tmpRestoreOutputDir = HBackupFileSystem.getBackupTmpDirPath(backupRoot); 127 Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpRestoreOutputDir, 128 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 129 // Delete content if exists 130 if (fs.exists(bulkOutputPath)) { 131 if (!fs.delete(bulkOutputPath, true)) { 132 LOG.warn("Can not delete: " + bulkOutputPath); 133 } 134 } 135 Configuration conf = getConf(); 136 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 137 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 138 139 player.setConf(getConf()); 140 int result = player.run(playerArgs); 141 if (!succeeded(result)) { 142 throw new IOException("Can not merge backup images for " + dirs 143 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 144 } 145 // Add to processed table list 146 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 147 LOG.debug("Merge Job finished:" + result); 148 } 149 List<TableName> tableList = toTableNameList(processedTableList); 150 table.updateProcessedTablesForMerge(tableList); 151 finishedTables = true; 152 153 // PHASE 2 (modification of a backup file system) 154 // Move existing mergedBackupId data into tmp directory 155 // we will need it later in case of a failure 156 Path tmpBackupDir = 157 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 158 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 159 160 if (!fs.rename(backupDirPath, tmpBackupDir)) { 161 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 162 } else { 163 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 164 } 165 // Move new data into backup dest 166 for (Pair<TableName, Path> tn : processedTableList) { 167 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 168 } 169 // Update backup manifest 170 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 171 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 172 // Copy meta files back from tmp to backup dir 173 copyMetaData(fs, tmpBackupDir, backupDirPath); 174 // Delete tmp dir (Rename back during repair) 175 if (!fs.delete(tmpBackupDir, true)) { 176 // WARN and ignore 177 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 178 } 179 // Delete old data 180 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 181 // Finish merge session 182 table.finishMergeOperation(); 183 // Release lock 184 table.finishBackupExclusiveOperation(); 185 } catch (RuntimeException e) { 186 187 throw e; 188 } catch (Exception e) { 189 LOG.error(e.toString(), e); 190 if (!finishedTables) { 191 // cleanup bulk directories and finish merge 192 // merge MUST be repeated (no need for repair) 193 if (fs != null) { 194 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 195 } 196 table.finishMergeOperation(); 197 table.finishBackupExclusiveOperation(); 198 throw new IOException("Backup merge operation failed, you should try it again", e); 199 } else { 200 // backup repair must be run 201 throw new IOException( 202 "Backup merge operation failed, run backup repair tool to restore system's integrity", e); 203 } 204 } finally { 205 table.close(); 206 conn.close(); 207 } 208 } 209 210 /** 211 * Copy meta data to of a backup session 212 * @param fs file system 213 * @param tmpBackupDir temp backup directory, where meta is locaed 214 * @param backupDirPath new path for backup 215 * @throws IOException exception 216 */ 217 protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath) 218 throws IOException { 219 RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true); 220 List<Path> toKeep = new ArrayList<Path>(); 221 while (it.hasNext()) { 222 Path p = it.next().getPath(); 223 if (fs.isDirectory(p)) { 224 continue; 225 } 226 // Keep meta 227 String fileName = p.toString(); 228 if ( 229 fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 230 || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0 231 || fileName.indexOf(BackupManifest.MANIFEST_FILE_NAME) > 0 232 ) { 233 toKeep.add(p); 234 } 235 } 236 // Copy meta to destination 237 for (Path p : toKeep) { 238 Path newPath = convertToDest(p, backupDirPath); 239 LOG.info("Copying tmp metadata from {} to {}", p, newPath); 240 copyFile(fs, p, newPath); 241 } 242 } 243 244 /** 245 * Copy file in DFS from p to newPath 246 * @param fs file system 247 * @param p old path 248 * @param newPath new path 249 * @throws IOException exception 250 */ 251 protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException { 252 try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) { 253 IOUtils.copy(in, out); 254 } 255 boolean exists = fs.exists(newPath); 256 if (!exists) { 257 throw new IOException("Failed to copy meta file to: " + newPath); 258 } 259 } 260 261 /** 262 * Converts path before copying 263 * @param p path 264 * @param backupDirPath backup root 265 * @return converted path 266 */ 267 protected Path convertToDest(Path p, Path backupDirPath) { 268 String backupId = backupDirPath.getName(); 269 Deque<String> stack = new ArrayDeque<String>(); 270 String name = null; 271 while (true) { 272 name = p.getName(); 273 if (!name.equals(backupId)) { 274 stack.push(name); 275 p = p.getParent(); 276 } else { 277 break; 278 } 279 } 280 Path newPath = new Path(backupDirPath.toString()); 281 while (!stack.isEmpty()) { 282 newPath = new Path(newPath, stack.pop()); 283 } 284 return newPath; 285 } 286 287 protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { 288 ArrayList<Path> list = new ArrayList<>(); 289 for (Pair<TableName, Path> p : processedTableList) { 290 list.add(p.getSecond()); 291 } 292 return list; 293 } 294 295 protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { 296 ArrayList<TableName> list = new ArrayList<>(); 297 for (Pair<TableName, Path> p : processedTableList) { 298 list.add(p.getFirst()); 299 } 300 return list; 301 } 302 303 protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { 304 for (Path path : pathList) { 305 if (!fs.delete(path, true)) { 306 LOG.warn("Can't delete " + path); 307 } 308 } 309 } 310 311 protected void updateBackupManifest(String backupRoot, String mergedBackupId, 312 List<String> backupsToDelete) throws IllegalArgumentException, IOException { 313 BackupManifest manifest = 314 HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); 315 LOG.info("Removing ancestors from merged backup {} : {}", mergedBackupId, backupsToDelete); 316 manifest.getBackupImage().removeAncestors(backupsToDelete); 317 // save back 318 LOG.info("Creating new manifest file for merged backup {} at root {}", mergedBackupId, 319 backupRoot); 320 manifest.store(conf); 321 } 322 323 protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, 324 String backupRoot) throws IOException { 325 // Delete from backup system table 326 try (BackupSystemTable table = new BackupSystemTable(conn)) { 327 for (String backupId : backupIds) { 328 LOG.info("Removing metadata for backup {}", backupId); 329 table.deleteBackupInfo(backupId); 330 } 331 } 332 333 // Delete from file system 334 for (String backupId : backupIds) { 335 LOG.info("Purging backup {} from FileSystem", backupId); 336 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); 337 338 if (!fs.delete(backupDirPath, true)) { 339 LOG.warn("Could not delete " + backupDirPath); 340 } 341 } 342 } 343 344 protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { 345 List<String> list = new ArrayList<>(); 346 for (String id : backupIds) { 347 if (id.equals(mergedBackupId)) { 348 continue; 349 } 350 list.add(id); 351 } 352 return list; 353 } 354 355 protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, 356 TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException { 357 Path dest = 358 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName)); 359 360 FileStatus[] fsts = fs.listStatus(bulkOutputPath); 361 for (FileStatus fst : fsts) { 362 if (fst.isDirectory()) { 363 String family = fst.getPath().getName(); 364 Path newDst = new Path(dest, family); 365 if (fs.exists(newDst)) { 366 if (!fs.delete(newDst, true)) { 367 throw new IOException("failed to delete :" + newDst); 368 } 369 } else { 370 fs.mkdirs(dest); 371 } 372 boolean result = fs.rename(fst.getPath(), dest); 373 LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result); 374 } 375 } 376 } 377 378 protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { 379 Set<TableName> allSet = new HashSet<>(); 380 381 try (Connection conn = ConnectionFactory.createConnection(conf); 382 BackupSystemTable table = new BackupSystemTable(conn)) { 383 for (String backupId : backupIds) { 384 BackupInfo bInfo = table.readBackupInfo(backupId); 385 386 allSet.addAll(bInfo.getTableNames()); 387 } 388 } 389 390 TableName[] ret = new TableName[allSet.size()]; 391 return allSet.toArray(ret); 392 } 393 394 protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, 395 String[] backupIds) throws IOException { 396 List<Path> dirs = new ArrayList<>(); 397 398 for (String backupId : backupIds) { 399 Path fileBackupDirPath = 400 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName)); 401 if (fs.exists(fileBackupDirPath)) { 402 dirs.add(fileBackupDirPath); 403 } else { 404 if (LOG.isDebugEnabled()) { 405 LOG.debug("File: " + fileBackupDirPath + " does not exist."); 406 } 407 } 408 } 409 Path[] ret = new Path[dirs.size()]; 410 return dirs.toArray(ret); 411 } 412}