001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayDeque; 026import java.util.ArrayList; 027import java.util.Deque; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import org.apache.commons.io.IOUtils; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.RemoteIterator; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.backup.BackupInfo; 041import org.apache.hadoop.hbase.backup.BackupMergeJob; 042import org.apache.hadoop.hbase.backup.HBackupFileSystem; 043import org.apache.hadoop.hbase.backup.impl.BackupManifest; 044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 045import org.apache.hadoop.hbase.backup.util.BackupUtils; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 049import org.apache.hadoop.hbase.util.FSTableDescriptors; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.hadoop.util.Tool; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * MapReduce implementation of {@link BackupMergeJob} Must be initialized with configuration of a 058 * backup destination cluster 059 */ 060@InterfaceAudience.Private 061public class MapReduceBackupMergeJob implements BackupMergeJob { 062 public static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupMergeJob.class); 063 064 protected Tool player; 065 protected Configuration conf; 066 067 public MapReduceBackupMergeJob() { 068 } 069 070 @Override 071 public Configuration getConf() { 072 return conf; 073 } 074 075 @Override 076 public void setConf(Configuration conf) { 077 this.conf = conf; 078 } 079 080 @Override 081 public void run(String[] backupIds) throws IOException { 082 String bulkOutputConfKey; 083 084 // TODO : run player on remote cluster 085 player = new MapReduceHFileSplitterJob(); 086 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 087 // Player reads all files in arbitrary directory structure and creates 088 // a Map task for each file 089 String bids = StringUtils.join(backupIds, ","); 090 091 if (LOG.isDebugEnabled()) { 092 LOG.debug("Merge backup images " + bids); 093 } 094 095 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 096 boolean finishedTables = false; 097 Connection conn = ConnectionFactory.createConnection(getConf()); 098 BackupSystemTable table = new BackupSystemTable(conn); 099 FileSystem fs = FileSystem.get(getConf()); 100 101 try { 102 103 // Get exclusive lock on backup system 104 table.startBackupExclusiveOperation(); 105 // Start merge operation 106 table.startMergeOperation(backupIds); 107 108 // Select most recent backup id 109 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 110 111 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 112 113 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 114 String backupRoot = bInfo.getBackupRootDir(); 115 116 for (int i = 0; i < tableNames.length; i++) { 117 LOG.info("Merge backup images for " + tableNames[i]); 118 119 // Find input directories for table 120 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 121 String dirs = StringUtils.join(dirPaths, ","); 122 123 Path bulkOutputPath = BackupUtils.getBulkOutputDir( 124 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 125 // Delete content if exists 126 if (fs.exists(bulkOutputPath)) { 127 if (!fs.delete(bulkOutputPath, true)) { 128 LOG.warn("Can not delete: " + bulkOutputPath); 129 } 130 } 131 Configuration conf = getConf(); 132 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 133 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 134 135 player.setConf(getConf()); 136 int result = player.run(playerArgs); 137 if (!succeeded(result)) { 138 throw new IOException("Can not merge backup images for " + dirs 139 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 140 } 141 // Add to processed table list 142 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 143 LOG.debug("Merge Job finished:" + result); 144 } 145 List<TableName> tableList = toTableNameList(processedTableList); 146 table.updateProcessedTablesForMerge(tableList); 147 finishedTables = true; 148 149 // PHASE 2 (modification of a backup file system) 150 // Move existing mergedBackupId data into tmp directory 151 // we will need it later in case of a failure 152 Path tmpBackupDir = 153 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 154 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 155 156 if (!fs.rename(backupDirPath, tmpBackupDir)) { 157 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 158 } else { 159 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 160 } 161 // Move new data into backup dest 162 for (Pair<TableName, Path> tn : processedTableList) { 163 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 164 } 165 // Update backup manifest 166 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 167 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 168 // Copy meta files back from tmp to backup dir 169 copyMetaData(fs, tmpBackupDir, backupDirPath); 170 // Delete tmp dir (Rename back during repair) 171 if (!fs.delete(tmpBackupDir, true)) { 172 // WARN and ignore 173 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 174 } 175 // Delete old data 176 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 177 // Finish merge session 178 table.finishMergeOperation(); 179 // Release lock 180 table.finishBackupExclusiveOperation(); 181 } catch (RuntimeException e) { 182 183 throw e; 184 } catch (Exception e) { 185 LOG.error(e.toString(), e); 186 if (!finishedTables) { 187 // cleanup bulk directories and finish merge 188 // merge MUST be repeated (no need for repair) 189 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 190 table.finishMergeOperation(); 191 table.finishBackupExclusiveOperation(); 192 throw new IOException("Backup merge operation failed, you should try it again", e); 193 } else { 194 // backup repair must be run 195 throw new IOException( 196 "Backup merge operation failed, run backup repair tool to restore system's integrity", e); 197 } 198 } finally { 199 table.close(); 200 conn.close(); 201 } 202 } 203 204 /** 205 * Copy meta data to of a backup session 206 * @param fs file system 207 * @param tmpBackupDir temp backup directory, where meta is locaed 208 * @param backupDirPath new path for backup 209 * @throws IOException exception 210 */ 211 protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath) 212 throws IOException { 213 RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true); 214 List<Path> toKeep = new ArrayList<Path>(); 215 while (it.hasNext()) { 216 Path p = it.next().getPath(); 217 if (fs.isDirectory(p)) { 218 continue; 219 } 220 // Keep meta 221 String fileName = p.toString(); 222 if ( 223 fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 224 || fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0 225 ) { 226 toKeep.add(p); 227 } 228 } 229 // Copy meta to destination 230 for (Path p : toKeep) { 231 Path newPath = convertToDest(p, backupDirPath); 232 copyFile(fs, p, newPath); 233 } 234 } 235 236 /** 237 * Copy file in DFS from p to newPath 238 * @param fs file system 239 * @param p old path 240 * @param newPath new path 241 * @throws IOException exception 242 */ 243 protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException { 244 try (InputStream in = fs.open(p); OutputStream out = fs.create(newPath, true)) { 245 IOUtils.copy(in, out); 246 } 247 boolean exists = fs.exists(newPath); 248 if (!exists) { 249 throw new IOException("Failed to copy meta file to: " + newPath); 250 } 251 } 252 253 /** 254 * Converts path before copying 255 * @param p path 256 * @param backupDirPath backup root 257 * @return converted path 258 */ 259 protected Path convertToDest(Path p, Path backupDirPath) { 260 String backupId = backupDirPath.getName(); 261 Deque<String> stack = new ArrayDeque<String>(); 262 String name = null; 263 while (true) { 264 name = p.getName(); 265 if (!name.equals(backupId)) { 266 stack.push(name); 267 p = p.getParent(); 268 } else { 269 break; 270 } 271 } 272 Path newPath = new Path(backupDirPath.toString()); 273 while (!stack.isEmpty()) { 274 newPath = new Path(newPath, stack.pop()); 275 } 276 return newPath; 277 } 278 279 protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { 280 ArrayList<Path> list = new ArrayList<>(); 281 for (Pair<TableName, Path> p : processedTableList) { 282 list.add(p.getSecond()); 283 } 284 return list; 285 } 286 287 protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { 288 ArrayList<TableName> list = new ArrayList<>(); 289 for (Pair<TableName, Path> p : processedTableList) { 290 list.add(p.getFirst()); 291 } 292 return list; 293 } 294 295 protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { 296 for (Path path : pathList) { 297 if (!fs.delete(path, true)) { 298 LOG.warn("Can't delete " + path); 299 } 300 } 301 } 302 303 protected void updateBackupManifest(String backupRoot, String mergedBackupId, 304 List<String> backupsToDelete) throws IllegalArgumentException, IOException { 305 BackupManifest manifest = 306 HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); 307 manifest.getBackupImage().removeAncestors(backupsToDelete); 308 // save back 309 manifest.store(conf); 310 } 311 312 protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, 313 String backupRoot) throws IOException { 314 // Delete from backup system table 315 try (BackupSystemTable table = new BackupSystemTable(conn)) { 316 for (String backupId : backupIds) { 317 table.deleteBackupInfo(backupId); 318 } 319 } 320 321 // Delete from file system 322 for (String backupId : backupIds) { 323 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); 324 325 if (!fs.delete(backupDirPath, true)) { 326 LOG.warn("Could not delete " + backupDirPath); 327 } 328 } 329 } 330 331 protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { 332 List<String> list = new ArrayList<>(); 333 for (String id : backupIds) { 334 if (id.equals(mergedBackupId)) { 335 continue; 336 } 337 list.add(id); 338 } 339 return list; 340 } 341 342 protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, 343 TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException { 344 Path dest = 345 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName)); 346 347 FileStatus[] fsts = fs.listStatus(bulkOutputPath); 348 for (FileStatus fst : fsts) { 349 if (fst.isDirectory()) { 350 String family = fst.getPath().getName(); 351 Path newDst = new Path(dest, family); 352 if (fs.exists(newDst)) { 353 if (!fs.delete(newDst, true)) { 354 throw new IOException("failed to delete :" + newDst); 355 } 356 } else { 357 fs.mkdirs(dest); 358 } 359 boolean result = fs.rename(fst.getPath(), dest); 360 LOG.debug("MoveData from " + fst.getPath() + " to " + dest + " result=" + result); 361 } 362 } 363 } 364 365 protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { 366 Set<TableName> allSet = new HashSet<>(); 367 368 try (Connection conn = ConnectionFactory.createConnection(conf); 369 BackupSystemTable table = new BackupSystemTable(conn)) { 370 for (String backupId : backupIds) { 371 BackupInfo bInfo = table.readBackupInfo(backupId); 372 373 allSet.addAll(bInfo.getTableNames()); 374 } 375 } 376 377 TableName[] ret = new TableName[allSet.size()]; 378 return allSet.toArray(ret); 379 } 380 381 protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, 382 String[] backupIds) throws IOException { 383 List<Path> dirs = new ArrayList<>(); 384 385 for (String backupId : backupIds) { 386 Path fileBackupDirPath = 387 new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName)); 388 if (fs.exists(fileBackupDirPath)) { 389 dirs.add(fileBackupDirPath); 390 } else { 391 if (LOG.isDebugEnabled()) { 392 LOG.debug("File: " + fileBackupDirPath + " does not exist."); 393 } 394 } 395 } 396 Path[] ret = new Path[dirs.size()]; 397 return dirs.toArray(ret); 398 } 399}