001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; 021 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import org.apache.commons.io.FilenameUtils; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.backup.BackupCopyJob; 036import org.apache.hadoop.hbase.backup.BackupInfo; 037import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; 038import org.apache.hadoop.hbase.backup.BackupRequest; 039import org.apache.hadoop.hbase.backup.BackupRestoreFactory; 040import org.apache.hadoop.hbase.backup.BackupType; 041import org.apache.hadoop.hbase.backup.HBackupFileSystem; 042import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; 043import org.apache.hadoop.hbase.backup.util.BackupUtils; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.mapreduce.WALPlayer; 048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 049import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.HFileArchiveUtil; 052import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 053import org.apache.hadoop.util.Tool; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 061 062/** 063 * Incremental backup implementation. See the {@link #execute() execute} method. 064 */ 065@InterfaceAudience.Private 066public class IncrementalTableBackupClient extends TableBackupClient { 067 private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); 068 069 protected IncrementalTableBackupClient() { 070 } 071 072 public IncrementalTableBackupClient(final Connection conn, final String backupId, 073 BackupRequest request) throws IOException { 074 super(conn, backupId, request); 075 } 076 077 protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException { 078 List<String> list = new ArrayList<>(); 079 for (String file : incrBackupFileList) { 080 Path p = new Path(file); 081 if (fs.exists(p) || isActiveWalPath(p)) { 082 list.add(file); 083 } else { 084 LOG.warn("Can't find file: " + file); 085 } 086 } 087 return list; 088 } 089 090 /** 091 * Check if a given path is belongs to active WAL directory 092 * @param p path 093 * @return true, if yes 094 */ 095 protected boolean isActiveWalPath(Path p) { 096 return !AbstractFSWALProvider.isArchivedLogFile(p); 097 } 098 099 protected static int getIndex(TableName tbl, List<TableName> sTableList) { 100 if (sTableList == null) { 101 return 0; 102 } 103 104 for (int i = 0; i < sTableList.size(); i++) { 105 if (tbl.equals(sTableList.get(i))) { 106 return i; 107 } 108 } 109 return -1; 110 } 111 112 /** 113 * Reads bulk load records from backup table, iterates through the records and forms the paths for 114 * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT 115 * clean up the entries in the bulk load system table. Those entries should not be cleaned until 116 * the backup is marked as complete. 117 * @param tablesToBackup list of tables to be backed up 118 */ 119 protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException { 120 List<String> activeFiles = new ArrayList<>(); 121 List<String> archiveFiles = new ArrayList<>(); 122 List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup); 123 FileSystem tgtFs; 124 try { 125 tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); 126 } catch (URISyntaxException use) { 127 throw new IOException("Unable to get FileSystem", use); 128 } 129 Path rootdir = CommonFSUtils.getRootDir(conf); 130 Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); 131 132 for (BulkLoad bulkLoad : bulkLoads) { 133 TableName srcTable = bulkLoad.getTableName(); 134 String regionName = bulkLoad.getRegion(); 135 String fam = bulkLoad.getColumnFamily(); 136 String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); 137 138 if (!tablesToBackup.contains(srcTable)) { 139 LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); 140 continue; 141 } 142 Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); 143 Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); 144 145 String srcTableQualifier = srcTable.getQualifierAsString(); 146 String srcTableNs = srcTable.getNamespaceAsString(); 147 Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier 148 + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); 149 if (!tgtFs.mkdirs(tgtFam)) { 150 throw new IOException("couldn't create " + tgtFam); 151 } 152 Path tgt = new Path(tgtFam, filename); 153 154 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); 155 Path archive = new Path(archiveDir, filename); 156 157 if (fs.exists(p)) { 158 if (LOG.isTraceEnabled()) { 159 LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), 160 srcTableQualifier); 161 LOG.trace("copying {} to {}", p, tgt); 162 } 163 activeFiles.add(p.toString()); 164 } else if (fs.exists(archive)) { 165 LOG.debug("copying archive {} to {}", archive, tgt); 166 archiveFiles.add(archive.toString()); 167 } 168 } 169 170 copyBulkLoadedFiles(activeFiles, archiveFiles); 171 return bulkLoads; 172 } 173 174 private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles) 175 throws IOException { 176 try { 177 // Enable special mode of BackupDistCp 178 conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); 179 // Copy active files 180 String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); 181 int attempt = 1; 182 while (activeFiles.size() > 0) { 183 LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. Attempt =" + attempt++); 184 String[] toCopy = new String[activeFiles.size()]; 185 activeFiles.toArray(toCopy); 186 // Active file can be archived during copy operation, 187 // we need to handle this properly 188 try { 189 incrementalCopyHFiles(toCopy, tgtDest); 190 break; 191 } catch (IOException e) { 192 // Check if some files got archived 193 // Update active and archived lists 194 // When file is being moved from active to archive 195 // directory, the number of active files decreases 196 int numOfActive = activeFiles.size(); 197 updateFileLists(activeFiles, archiveFiles); 198 if (activeFiles.size() < numOfActive) { 199 continue; 200 } 201 // if not - throw exception 202 throw e; 203 } 204 } 205 // If incremental copy will fail for archived files 206 // we will have partially loaded files in backup destination (only files from active data 207 // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up 208 if (archiveFiles.size() > 0) { 209 String[] toCopy = new String[archiveFiles.size()]; 210 archiveFiles.toArray(toCopy); 211 incrementalCopyHFiles(toCopy, tgtDest); 212 } 213 } finally { 214 // Disable special mode of BackupDistCp 215 conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); 216 } 217 } 218 219 private void updateFileLists(List<String> activeFiles, List<String> archiveFiles) 220 throws IOException { 221 List<String> newlyArchived = new ArrayList<>(); 222 223 for (String spath : activeFiles) { 224 if (!fs.exists(new Path(spath))) { 225 newlyArchived.add(spath); 226 } 227 } 228 229 if (newlyArchived.size() > 0) { 230 activeFiles.removeAll(newlyArchived); 231 archiveFiles.addAll(newlyArchived); 232 } 233 234 LOG.debug(newlyArchived.size() + " files have been archived."); 235 } 236 237 /** 238 * @throws IOException If the execution of the backup fails 239 * @throws ColumnFamilyMismatchException If the column families of the current table do not match 240 * the column families for the last full backup. In which 241 * case, a full backup should be taken 242 */ 243 @Override 244 public void execute() throws IOException, ColumnFamilyMismatchException { 245 try { 246 Map<TableName, String> tablesToFullBackupIds = getFullBackupIds(); 247 verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); 248 249 // case PREPARE_INCREMENTAL: 250 beginBackup(backupManager, backupInfo); 251 backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); 252 LOG.debug("For incremental backup, current table set is " 253 + backupManager.getIncrementalBackupTableSet()); 254 newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); 255 } catch (Exception e) { 256 // fail the overall backup and return 257 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 258 BackupType.INCREMENTAL, conf); 259 throw new IOException(e); 260 } 261 262 // case INCREMENTAL_COPY: 263 try { 264 // copy out the table and region info files for each table 265 BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); 266 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT 267 convertWALsToHFiles(); 268 incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, 269 backupInfo.getBackupRootDir()); 270 } catch (Exception e) { 271 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; 272 // fail the overall backup and return 273 failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); 274 throw new IOException(e); 275 } 276 // case INCR_BACKUP_COMPLETE: 277 // set overall backup status: complete. Here we make sure to complete the backup. 278 // After this checkpoint, even if entering cancel process, will let the backup finished 279 try { 280 // Set the previousTimestampMap which is before this current log roll to the manifest. 281 Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap(); 282 backupInfo.setIncrTimestampMap(previousTimestampMap); 283 284 // The table list in backupInfo is good for both full backup and incremental backup. 285 // For incremental backup, it contains the incremental backup table set. 286 backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); 287 288 Map<TableName, Map<String, Long>> newTableSetTimestampMap = 289 backupManager.readLogTimestampMap(); 290 291 backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); 292 Long newStartCode = 293 BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); 294 backupManager.writeBackupStartCode(newStartCode); 295 296 List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames()); 297 298 // backup complete 299 completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); 300 301 List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey); 302 backupManager.deleteBulkLoadedRows(bulkLoadedRows); 303 } catch (IOException e) { 304 failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", 305 BackupType.INCREMENTAL, conf); 306 throw new IOException(e); 307 } 308 } 309 310 protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { 311 try { 312 LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); 313 // set overall backup phase: incremental_copy 314 backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); 315 // get incremental backup file list and prepare parms for DistCp 316 String[] strArr = new String[files.length + 1]; 317 System.arraycopy(files, 0, strArr, 0, files.length); 318 strArr[strArr.length - 1] = backupDest; 319 320 String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); 321 if (LOG.isDebugEnabled()) { 322 LOG.debug("Setting incremental copy HFiles job name to : " + jobname); 323 } 324 conf.set(JOB_NAME_CONF_KEY, jobname); 325 326 BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); 327 int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); 328 if (res != 0) { 329 LOG.error("Copy incremental HFile files failed with return code: " + res + "."); 330 throw new IOException( 331 "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest); 332 } 333 LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest 334 + " finished."); 335 } finally { 336 deleteBulkLoadDirectory(); 337 } 338 } 339 340 protected void deleteBulkLoadDirectory() throws IOException { 341 // delete original bulk load directory on method exit 342 Path path = getBulkOutputDir(); 343 FileSystem fs = FileSystem.get(path.toUri(), conf); 344 boolean result = fs.delete(path, true); 345 if (!result) { 346 LOG.warn("Could not delete " + path); 347 } 348 } 349 350 protected void convertWALsToHFiles() throws IOException { 351 // get incremental backup file list and prepare parameters for DistCp 352 List<String> incrBackupFileList = backupInfo.getIncrBackupFileList(); 353 // Get list of tables in incremental backup set 354 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet(); 355 // filter missing files out (they have been copied by previous backups) 356 incrBackupFileList = filterMissingFiles(incrBackupFileList); 357 List<String> tableList = new ArrayList<String>(); 358 for (TableName table : tableSet) { 359 // Check if table exists 360 if (tableExists(table, conn)) { 361 tableList.add(table.getNameAsString()); 362 } else { 363 LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); 364 } 365 } 366 walToHFiles(incrBackupFileList, tableList); 367 368 } 369 370 protected boolean tableExists(TableName table, Connection conn) throws IOException { 371 try (Admin admin = conn.getAdmin()) { 372 return admin.tableExists(table); 373 } 374 } 375 376 protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException { 377 Tool player = new WALPlayer(); 378 379 // Player reads all files in arbitrary directory structure and creates 380 // a Map task for each file. We use ';' as separator 381 // because WAL file names contains ',' 382 String dirs = StringUtils.join(dirPaths, ';'); 383 String jobname = "Incremental_Backup-" + backupId; 384 385 Path bulkOutputPath = getBulkOutputDir(); 386 conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); 387 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); 388 conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 389 conf.set(JOB_NAME_CONF_KEY, jobname); 390 String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; 391 392 try { 393 player.setConf(conf); 394 int result = player.run(playerArgs); 395 if (result != 0) { 396 throw new IOException("WAL Player failed"); 397 } 398 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); 399 conf.unset(JOB_NAME_CONF_KEY); 400 } catch (IOException e) { 401 throw e; 402 } catch (Exception ee) { 403 throw new IOException("Can not convert from directory " + dirs 404 + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); 405 } 406 } 407 408 protected Path getBulkOutputDirForTable(TableName table) { 409 Path tablePath = getBulkOutputDir(); 410 tablePath = new Path(tablePath, table.getNamespaceAsString()); 411 tablePath = new Path(tablePath, table.getQualifierAsString()); 412 return new Path(tablePath, "data"); 413 } 414 415 protected Path getBulkOutputDir() { 416 String backupId = backupInfo.getBackupId(); 417 Path path = new Path(backupInfo.getBackupRootDir()); 418 path = new Path(path, ".tmp"); 419 path = new Path(path, backupId); 420 return path; 421 } 422 423 private Map<TableName, String> getFullBackupIds() throws IOException { 424 // Ancestors are stored from newest to oldest, so we can iterate backwards 425 // in order to populate our backupId map with the most recent full backup 426 // for a given table 427 List<BackupManifest.BackupImage> images = getAncestors(backupInfo); 428 Map<TableName, String> results = new HashMap<>(); 429 for (int i = images.size() - 1; i >= 0; i--) { 430 BackupManifest.BackupImage image = images.get(i); 431 if (image.getType() != BackupType.FULL) { 432 continue; 433 } 434 435 for (TableName tn : image.getTableNames()) { 436 results.put(tn, image.getBackupId()); 437 } 438 } 439 return results; 440 } 441 442 /** 443 * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full 444 * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch 445 * is detected, a full table backup should be taken, rather than an incremental one 446 */ 447 private void verifyCfCompatibility(Set<TableName> tables, 448 Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException { 449 ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder = 450 ColumnFamilyMismatchException.newBuilder(); 451 try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { 452 for (TableName tn : tables) { 453 String backupId = tablesToFullBackupId.get(tn); 454 BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId); 455 456 ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); 457 String snapshotName = fullBackupInfo.getSnapshotName(tn); 458 Path root = HBackupFileSystem.getTableBackupPath(tn, 459 new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); 460 Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); 461 462 FileSystem fs; 463 try { 464 fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf); 465 } catch (URISyntaxException e) { 466 throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e); 467 } 468 469 SnapshotProtos.SnapshotDescription snapshotDescription = 470 SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); 471 SnapshotManifest manifest = 472 SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); 473 474 ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies(); 475 if (!areCfsCompatible(currentCfs, backupCfs)) { 476 exBuilder.addMismatchedTable(tn, currentCfs, backupCfs); 477 } 478 } 479 } 480 481 ColumnFamilyMismatchException ex = exBuilder.build(); 482 if (!ex.getMismatchedTables().isEmpty()) { 483 throw ex; 484 } 485 } 486 487 private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs, 488 ColumnFamilyDescriptor[] backupCfs) { 489 if (currentCfs.length != backupCfs.length) { 490 return false; 491 } 492 493 for (int i = 0; i < backupCfs.length; i++) { 494 String currentCf = currentCfs[i].getNameAsString(); 495 String backupCf = backupCfs[i].getNameAsString(); 496 497 if (!currentCf.equals(backupCf)) { 498 return false; 499 } 500 } 501 502 return true; 503 } 504}