001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.master; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.backup.BackupInfo; 031import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 032import org.apache.hadoop.hbase.backup.impl.BackupManager; 033import org.apache.hadoop.hbase.backup.util.BackupUtils; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.MasterServices; 038import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 041import org.apache.hadoop.hbase.wal.WAL; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; 047import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 048 049/** 050 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup 051 * before deleting it when its TTL is over. 052 */ 053@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 054public class BackupLogCleaner extends BaseLogCleanerDelegate { 055 private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class); 056 057 private boolean stopped = false; 058 private Connection conn; 059 060 public BackupLogCleaner() { 061 } 062 063 @Override 064 public void init(Map<String, Object> params) { 065 MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER); 066 if (master != null) { 067 conn = master.getConnection(); 068 if (getConf() == null) { 069 super.setConf(conn.getConfiguration()); 070 } 071 } 072 if (conn == null) { 073 try { 074 conn = ConnectionFactory.createConnection(getConf()); 075 } catch (IOException ioe) { 076 throw new RuntimeException("Failed to create connection", ioe); 077 } 078 } 079 } 080 081 private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> backups) 082 throws IOException { 083 Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>(); 084 085 Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>(); 086 for (BackupInfo backupInfo : backups) { 087 for (TableName table : backupInfo.getTables()) { 088 tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs()); 089 if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) { 090 tableNameBackupInfoMap.put(table, backupInfo.getStartTs()); 091 for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table) 092 .entrySet()) { 093 serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue()); 094 } 095 } 096 } 097 } 098 099 return serverAddressToLastBackupMap; 100 } 101 102 @Override 103 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 104 List<FileStatus> filteredFiles = new ArrayList<>(); 105 106 // all members of this class are null if backup is disabled, 107 // so we cannot filter the files 108 if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { 109 LOG.debug("Backup is not enabled. Check your {} setting", 110 BackupRestoreConstants.BACKUP_ENABLE_KEY); 111 return files; 112 } 113 114 Map<Address, Long> addressToLastBackupMap; 115 try { 116 try (BackupManager backupManager = new BackupManager(conn, getConf())) { 117 addressToLastBackupMap = 118 getServersToOldestBackupMapping(backupManager.getBackupHistory(true)); 119 } 120 } catch (IOException ex) { 121 LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", 122 ex.getMessage(), ex); 123 return Collections.emptyList(); 124 } 125 for (FileStatus file : files) { 126 String fn = file.getPath().getName(); 127 if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) { 128 filteredFiles.add(file); 129 continue; 130 } 131 132 try { 133 Address walServerAddress = 134 Address.fromString(BackupUtils.parseHostNameFromLogFile(file.getPath())); 135 long walTimestamp = WAL.getTimestamp(file.getPath().getName()); 136 137 if ( 138 !addressToLastBackupMap.containsKey(walServerAddress) 139 || addressToLastBackupMap.get(walServerAddress) >= walTimestamp 140 ) { 141 filteredFiles.add(file); 142 } 143 } catch (Exception ex) { 144 LOG.warn( 145 "Error occurred while filtering file: {} with error: {}. Ignoring cleanup of this log", 146 file.getPath(), ex.getMessage()); 147 } 148 } 149 150 LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), 151 filteredFiles.size()); 152 return filteredFiles; 153 } 154 155 @Override 156 public void setConf(Configuration config) { 157 // If backup is disabled, keep all members null 158 super.setConf(config); 159 if ( 160 !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, 161 BackupRestoreConstants.BACKUP_ENABLE_DEFAULT) 162 ) { 163 LOG.warn("Backup is disabled - allowing all wals to be deleted"); 164 } 165 } 166 167 @Override 168 public void stop(String why) { 169 if (!this.stopped) { 170 this.stopped = true; 171 LOG.info("Stopping BackupLogCleaner"); 172 } 173 } 174 175 @Override 176 public boolean isStopped() { 177 return this.stopped; 178 } 179}