001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.master; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.backup.BackupInfo; 033import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 034import org.apache.hadoop.hbase.backup.impl.BackupManager; 035import org.apache.hadoop.hbase.backup.util.BackupUtils; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.master.HMaster; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 041import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 042import org.apache.hadoop.hbase.net.Address; 043import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 050import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 051 052/** 053 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup 054 * before deleting it when its TTL is over. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public class BackupLogCleaner extends BaseLogCleanerDelegate { 058 private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class); 059 060 private boolean stopped = false; 061 private Connection conn; 062 063 public BackupLogCleaner() { 064 } 065 066 @Override 067 public void init(Map<String, Object> params) { 068 MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER); 069 if (master != null) { 070 conn = master.getConnection(); 071 if (getConf() == null) { 072 super.setConf(conn.getConfiguration()); 073 } 074 } 075 if (conn == null) { 076 try { 077 conn = ConnectionFactory.createConnection(getConf()); 078 } catch (IOException ioe) { 079 throw new RuntimeException("Failed to create connection", ioe); 080 } 081 } 082 } 083 084 /** 085 * Calculates the timestamp boundary up to which all backup roots have already included the WAL. 086 * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental 087 * backups. 088 */ 089 private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backups) 090 throws IOException { 091 if (LOG.isDebugEnabled()) { 092 LOG.debug( 093 "Cleaning WALs if they are older than the WAL cleanup time-boundary. " 094 + "Checking WALs against {} backups: {}", 095 backups.size(), 096 backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", "))); 097 } 098 099 // This map tracks, for every backup root, the most recent created backup (= highest timestamp) 100 Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>(); 101 for (BackupInfo backup : backups) { 102 BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir()); 103 if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) { 104 newestBackupPerRootDir.put(backup.getBackupRootDir(), backup); 105 } 106 } 107 108 if (LOG.isDebugEnabled()) { 109 LOG.debug("WAL cleanup time-boundary using info from: {}. ", 110 newestBackupPerRootDir.entrySet().stream() 111 .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted() 112 .collect(Collectors.joining(", "))); 113 } 114 115 // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp) 116 // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup 117 // roots have included the WAL in their backup. 118 Map<Address, Long> boundaries = new HashMap<>(); 119 for (BackupInfo backupInfo : newestBackupPerRootDir.values()) { 120 // Iterate over all tables in the timestamp map, which contains all tables covered in the 121 // backup root, not just the tables included in that specific backup (which could be a subset) 122 for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { 123 for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table) 124 .entrySet()) { 125 Address address = Address.fromString(entry.getKey()); 126 Long storedTs = boundaries.get(address); 127 if (storedTs == null || entry.getValue() < storedTs) { 128 boundaries.put(address, entry.getValue()); 129 } 130 } 131 } 132 } 133 134 if (LOG.isDebugEnabled()) { 135 for (Map.Entry<Address, Long> entry : boundaries.entrySet()) { 136 LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), 137 entry.getValue()); 138 } 139 } 140 141 return boundaries; 142 } 143 144 @Override 145 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 146 List<FileStatus> filteredFiles = new ArrayList<>(); 147 148 // all members of this class are null if backup is disabled, 149 // so we cannot filter the files 150 if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { 151 LOG.debug("Backup is not enabled. Check your {} setting", 152 BackupRestoreConstants.BACKUP_ENABLE_KEY); 153 return files; 154 } 155 156 Map<Address, Long> serverToPreservationBoundaryTs; 157 try { 158 try (BackupManager backupManager = new BackupManager(conn, getConf())) { 159 serverToPreservationBoundaryTs = 160 serverToPreservationBoundaryTs(backupManager.getBackupHistory(true)); 161 } 162 } catch (IOException ex) { 163 LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", 164 ex.getMessage(), ex); 165 return Collections.emptyList(); 166 } 167 for (FileStatus file : files) { 168 if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) { 169 filteredFiles.add(file); 170 } 171 } 172 173 LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), 174 filteredFiles.size()); 175 return filteredFiles; 176 } 177 178 @Override 179 public void setConf(Configuration config) { 180 // If backup is disabled, keep all members null 181 super.setConf(config); 182 if ( 183 !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, 184 BackupRestoreConstants.BACKUP_ENABLE_DEFAULT) 185 ) { 186 LOG.warn("Backup is disabled - allowing all wals to be deleted"); 187 } 188 } 189 190 @Override 191 public void stop(String why) { 192 if (!this.stopped) { 193 this.stopped = true; 194 LOG.info("Stopping BackupLogCleaner"); 195 } 196 } 197 198 @Override 199 public boolean isStopped() { 200 return this.stopped; 201 } 202 203 protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, Path path) { 204 if (isHMasterWAL(path)) { 205 return true; 206 } 207 208 try { 209 String hostname = BackupUtils.parseHostNameFromLogFile(path); 210 if (hostname == null) { 211 LOG.warn( 212 "Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log", 213 path); 214 return false; 215 } 216 Address walServerAddress = Address.fromString(hostname); 217 long walTimestamp = AbstractFSWALProvider.getTimestamp(path.getName()); 218 219 if (!addressToBoundaryTs.containsKey(walServerAddress)) { 220 if (LOG.isDebugEnabled()) { 221 LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}", 222 walServerAddress.getHostName(), path); 223 } 224 return true; 225 } 226 227 Long backupBoundary = addressToBoundaryTs.get(walServerAddress); 228 if (backupBoundary >= walTimestamp) { 229 if (LOG.isDebugEnabled()) { 230 LOG.debug( 231 "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}", 232 walServerAddress.getHostName(), backupBoundary, path); 233 } 234 return true; 235 } 236 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}", 239 walServerAddress.getHostName(), backupBoundary, path); 240 } 241 } catch (Exception ex) { 242 LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex); 243 return false; 244 } 245 return false; 246 } 247 248 private static boolean isHMasterWAL(Path path) { 249 String fn = path.getName(); 250 return fn.startsWith(WALProcedureStore.LOG_PREFIX) 251 || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX); 252 } 253}