001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; 021 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.TreeMap; 030import org.apache.commons.lang3.StringUtils; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.backup.BackupCopyJob; 035import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; 036import org.apache.hadoop.hbase.backup.BackupRequest; 037import org.apache.hadoop.hbase.backup.BackupRestoreFactory; 038import org.apache.hadoop.hbase.backup.BackupType; 039import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; 040import org.apache.hadoop.hbase.backup.util.BackupUtils; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 044import org.apache.hadoop.hbase.mapreduce.WALPlayer; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.HFileArchiveUtil; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.apache.hadoop.util.Tool; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Incremental backup implementation. See the {@link #execute() execute} method. 057 */ 058@InterfaceAudience.Private 059public class IncrementalTableBackupClient extends TableBackupClient { 060 private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); 061 062 protected IncrementalTableBackupClient() { 063 } 064 065 public IncrementalTableBackupClient(final Connection conn, final String backupId, 066 BackupRequest request) throws IOException { 067 super(conn, backupId, request); 068 } 069 070 protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { 071 List<String> list = new ArrayList<>(); 072 for (String file : incrBackupFileList) { 073 Path p = new Path(file); 074 if (fs.exists(p) || isActiveWalPath(p)) { 075 list.add(file); 076 } else { 077 LOG.warn("Can't find file: " + file); 078 } 079 } 080 return list; 081 } 082 083 /** 084 * Check if a given path is belongs to active WAL directory 085 * @param p path 086 * @return true, if yes 087 */ 088 protected boolean isActiveWalPath(Path p) { 089 return !AbstractFSWALProvider.isArchivedLogFile(p); 090 } 091 092 protected static int getIndex(TableName tbl, List<TableName> sTableList) { 093 if (sTableList == null) { 094 return 0; 095 } 096 097 for (int i = 0; i < sTableList.size(); i++) { 098 if (tbl.equals(sTableList.get(i))) { 099 return i; 100 } 101 } 102 return -1; 103 } 104 105 /* 106 * Reads bulk load records from backup table, iterates through the records and forms the paths for 107 * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination 108 * @param sTableList list of tables to be backed up 109 * @return map of table to List of files 110 */ 111 @SuppressWarnings("unchecked") 112 protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) 113 throws IOException { 114 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()]; 115 List<String> activeFiles = new ArrayList<>(); 116 List<String> archiveFiles = new ArrayList<>(); 117 Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair = 118 backupManager.readBulkloadRows(sTableList); 119 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst(); 120 FileSystem tgtFs; 121 try { 122 tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); 123 } catch (URISyntaxException use) { 124 throw new IOException("Unable to get FileSystem", use); 125 } 126 Path rootdir = CommonFSUtils.getRootDir(conf); 127 Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); 128 129 for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry : map 130 .entrySet()) { 131 TableName srcTable = tblEntry.getKey(); 132 133 int srcIdx = getIndex(srcTable, sTableList); 134 if (srcIdx < 0) { 135 LOG.warn("Couldn't find " + srcTable + " in source table List"); 136 continue; 137 } 138 if (mapForSrc[srcIdx] == null) { 139 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 140 } 141 Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); 142 Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), 143 srcTable.getQualifierAsString()); 144 for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> regionEntry : tblEntry 145 .getValue().entrySet()) { 146 String regionName = regionEntry.getKey(); 147 Path regionDir = new Path(tblDir, regionName); 148 // map from family to List of hfiles 149 for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : regionEntry.getValue() 150 .entrySet()) { 151 String fam = famEntry.getKey(); 152 Path famDir = new Path(regionDir, fam); 153 List<Path> files; 154 if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) { 155 files = new ArrayList<>(); 156 mapForSrc[srcIdx].put(Bytes.toBytes(fam), files); 157 } else { 158 files = mapForSrc[srcIdx].get(Bytes.toBytes(fam)); 159 } 160 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); 161 String tblName = srcTable.getQualifierAsString(); 162 Path tgtFam = new Path(new Path(tgtTable, regionName), fam); 163 if (!tgtFs.mkdirs(tgtFam)) { 164 throw new IOException("couldn't create " + tgtFam); 165 } 166 for (Pair<String, Boolean> fileWithState : famEntry.getValue()) { 167 String file = fileWithState.getFirst(); 168 int idx = file.lastIndexOf("/"); 169 String filename = file; 170 if (idx > 0) { 171 filename = file.substring(idx + 1); 172 } 173 Path p = new Path(famDir, filename); 174 Path tgt = new Path(tgtFam, filename); 175 Path archive = new Path(archiveDir, filename); 176 if (fs.exists(p)) { 177 if (LOG.isTraceEnabled()) { 178 LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); 179 } 180 if (LOG.isTraceEnabled()) { 181 LOG.trace("copying " + p + " to " + tgt); 182 } 183 activeFiles.add(p.toString()); 184 } else if (fs.exists(archive)) { 185 LOG.debug("copying archive " + archive + " to " + tgt); 186 archiveFiles.add(archive.toString()); 187 } 188 files.add(tgt); 189 } 190 } 191 } 192 } 193 194 copyBulkLoadedFiles(activeFiles, archiveFiles); 195 backupManager.deleteBulkLoadedRows(pair.getSecond()); 196 return mapForSrc; 197 } 198 199 private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles) 200 throws IOException { 201 try { 202 // Enable special mode of BackupDistCp 203 conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); 204 // Copy active files 205 String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); 206 int attempt = 1; 207 while (activeFiles.size() > 0) { 208 LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++); 209 String[] toCopy = new String[activeFiles.size()]; 210 activeFiles.toArray(toCopy); 211 // Active file can be archived during copy operation, 212 // we need to handle this properly 213 try { 214 incrementalCopyHFiles(toCopy, tgtDest); 215 break; 216 } catch (IOException e) { 217 // Check if some files got archived 218 // Update active and archived lists 219 // When file is being moved from active to archive 220 // directory, the number of active files decreases 221 int numOfActive = activeFiles.size(); 222 updateFileLists(activeFiles, archiveFiles); 223 if (activeFiles.size() < numOfActive) { 224 continue; 225 } 226 // if not - throw exception 227 throw e; 228 } 229 } 230 // If incremental copy will fail for archived files 231 // we will have partially loaded files in backup destination (only files from active data 232 // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up 233 if (archiveFiles.size() > 0) { 234 String[] toCopy = new String[archiveFiles.size()]; 235 archiveFiles.toArray(toCopy); 236 incrementalCopyHFiles(toCopy, tgtDest); 237 } 238 } finally { 239 // Disable special mode of BackupDistCp 240 conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); 241 } 242 } 243 244 private void updateFileLists(List<String> activeFiles, List<String> archiveFiles) 245 throws IOException { 246 List<String> newlyArchived = new ArrayList<>(); 247 248 for (String spath : activeFiles) { 249 if (!fs.exists(new Path(spath))) { 250 newlyArchived.add(spath); 251 } 252 } 253 254 if (newlyArchived.size() > 0) { 255 activeFiles.removeAll(newlyArchived); 256 archiveFiles.addAll(newlyArchived); 257 } 258 259 LOG.debug(newlyArchived.size() + " files have been archived."); 260 } 261 262 @Override 263 public void execute() throws IOException { 264 try { 265 // case PREPARE_INCREMENTAL: 266 beginBackup(backupManager, backupInfo); 267 backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); 268 LOG.debug("For incremental backup, current table set is " 269 + backupManager.getIncrementalBackupTableSet()); 270 newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); 271 } catch (Exception e) { 272 // fail the overall backup and return 273 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 274 BackupType.INCREMENTAL, conf); 275 throw new IOException(e); 276 } 277 278 // case INCREMENTAL_COPY: 279 try { 280 // copy out the table and region info files for each table 281 BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); 282 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT 283 convertWALsToHFiles(); 284 incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, 285 backupInfo.getBackupRootDir()); 286 } catch (Exception e) { 287 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; 288 // fail the overall backup and return 289 failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); 290 throw new IOException(e); 291 } 292 // case INCR_BACKUP_COMPLETE: 293 // set overall backup status: complete. Here we make sure to complete the backup. 294 // After this checkpoint, even if entering cancel process, will let the backup finished 295 try { 296 // Set the previousTimestampMap which is before this current log roll to the manifest. 297 Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); 298 backupInfo.setIncrTimestampMap(previousTimestampMap); 299 300 // The table list in backupInfo is good for both full backup and incremental backup. 301 // For incremental backup, it contains the incremental backup table set. 302 backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); 303 304 Map<TableName, Map<String, Long>> newTableSetTimestampMap = 305 backupManager.readLogTimestampMap(); 306 307 backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); 308 Long newStartCode = 309 BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); 310 backupManager.writeBackupStartCode(newStartCode); 311 312 handleBulkLoad(backupInfo.getTableNames()); 313 // backup complete 314 completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); 315 316 } catch (IOException e) { 317 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 318 BackupType.INCREMENTAL, conf); 319 throw new IOException(e); 320 } 321 } 322 323 protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { 324 try { 325 LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); 326 // set overall backup phase: incremental_copy 327 backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); 328 // get incremental backup file list and prepare parms for DistCp 329 String[] strArr = new String[files.length + 1]; 330 System.arraycopy(files, 0, strArr, 0, files.length); 331 strArr[strArr.length - 1] = backupDest; 332 333 String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); 334 if (LOG.isDebugEnabled()) { 335 LOG.debug("Setting incremental copy HFiles job name to : " + jobname); 336 } 337 conf.set(JOB_NAME_CONF_KEY, jobname); 338 339 BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); 340 int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); 341 if (res != 0) { 342 LOG.error("Copy incremental HFile files failed with return code: " + res + "."); 343 throw new IOException( 344 "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest); 345 } 346 LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest 347 + " finished."); 348 } finally { 349 deleteBulkLoadDirectory(); 350 } 351 } 352 353 protected void deleteBulkLoadDirectory() throws IOException { 354 // delete original bulk load directory on method exit 355 Path path = getBulkOutputDir(); 356 FileSystem fs = FileSystem.get(path.toUri(), conf); 357 boolean result = fs.delete(path, true); 358 if (!result) { 359 LOG.warn("Could not delete " + path); 360 } 361 } 362 363 protected void convertWALsToHFiles() throws IOException { 364 // get incremental backup file list and prepare parameters for DistCp 365 List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); 366 // Get list of tables in incremental backup set 367 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); 368 // filter missing files out (they have been copied by previous backups) 369 incrBackupFileList = filterMissingFiles(incrBackupFileList); 370 List<String> tableList = new ArrayList<String>(); 371 for (TableName table : tableSet) { 372 // Check if table exists 373 if (tableExists(table, conn)) { 374 tableList.add(table.getNameAsString()); 375 } else { 376 LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); 377 } 378 } 379 walToHFiles(incrBackupFileList, tableList); 380 381 } 382 383 protected boolean tableExists(TableName table, Connection conn) throws IOException { 384 try (Admin admin = conn.getAdmin()) { 385 return admin.tableExists(table); 386 } 387 } 388 389 protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { 390 Tool player = new WALPlayer(); 391 392 // Player reads all files in arbitrary directory structure and creates 393 // a Map task for each file. We use ';' as separator 394 // because WAL file names contains ',' 395 String dirs = StringUtils.join(dirPaths, ';'); 396 String jobname = "Incremental_Backup-" + backupId; 397 398 Path bulkOutputPath = getBulkOutputDir(); 399 conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 400 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 401 conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); 402 conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 403 conf.set(JOB_NAME_CONF_KEY, jobname); 404 String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; 405 406 try { 407 player.setConf(conf); 408 int result = player.run(playerArgs); 409 if (result != 0) { 410 throw new IOException("WAL Player failed"); 411 } 412 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); 413 conf.unset(JOB_NAME_CONF_KEY); 414 } catch (IOException e) { 415 throw e; 416 } catch (Exception ee) { 417 throw new IOException("Can not convert from directory " + dirs 418 + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); 419 } 420 } 421 422 protected Path getBulkOutputDirForTable(TableName table) { 423 Path tablePath = getBulkOutputDir(); 424 tablePath = new Path(tablePath, table.getNamespaceAsString()); 425 tablePath = new Path(tablePath, table.getQualifierAsString()); 426 return new Path(tablePath, "data"); 427 } 428 429 protected Path getBulkOutputDir() { 430 String backupId = backupInfo.getBackupId(); 431 Path path = new Path(backupInfo.getBackupRootDir()); 432 path = new Path(path, ".tmp"); 433 path = new Path(path, backupId); 434 return path; 435 } 436}