001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.master;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.stream.Collectors;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
034import org.apache.hadoop.hbase.backup.impl.BackupManager;
035import org.apache.hadoop.hbase.backup.util.BackupUtils;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.master.HMaster;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
041import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
042import org.apache.hadoop.hbase.net.Address;
043import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
050import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
051
052/**
053 * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
054 * before deleting it when its TTL is over.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public class BackupLogCleaner extends BaseLogCleanerDelegate {
058  private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
059
060  private boolean stopped = false;
061  private Connection conn;
062
063  public BackupLogCleaner() {
064  }
065
066  @Override
067  public void init(Map<String, Object> params) {
068    MasterServices master = (MasterServices) MapUtils.getObject(params, HMaster.MASTER);
069    if (master != null) {
070      conn = master.getConnection();
071      if (getConf() == null) {
072        super.setConf(conn.getConfiguration());
073      }
074    }
075    if (conn == null) {
076      try {
077        conn = ConnectionFactory.createConnection(getConf());
078      } catch (IOException ioe) {
079        throw new RuntimeException("Failed to create connection", ioe);
080      }
081    }
082  }
083
084  /**
085   * Calculates the timestamp boundary up to which all backup roots have already included the WAL.
086   * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
087   * backups.
088   */
089  private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backups)
090    throws IOException {
091    if (LOG.isDebugEnabled()) {
092      LOG.debug(
093        "Cleaning WALs if they are older than the WAL cleanup time-boundary. "
094          + "Checking WALs against {} backups: {}",
095        backups.size(),
096        backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
097    }
098
099    // This map tracks, for every backup root, the most recent created backup (= highest timestamp)
100    Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
101    for (BackupInfo backup : backups) {
102      BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
103      if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
104        newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
105      }
106    }
107
108    if (LOG.isDebugEnabled()) {
109      LOG.debug("WAL cleanup time-boundary using info from: {}. ",
110        newestBackupPerRootDir.entrySet().stream()
111          .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
112          .collect(Collectors.joining(", ")));
113    }
114
115    // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
116    // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
117    // roots have included the WAL in their backup.
118    Map<Address, Long> boundaries = new HashMap<>();
119    for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
120      // Iterate over all tables in the timestamp map, which contains all tables covered in the
121      // backup root, not just the tables included in that specific backup (which could be a subset)
122      for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
123        for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
124          .entrySet()) {
125          Address address = Address.fromString(entry.getKey());
126          Long storedTs = boundaries.get(address);
127          if (storedTs == null || entry.getValue() < storedTs) {
128            boundaries.put(address, entry.getValue());
129          }
130        }
131      }
132    }
133
134    if (LOG.isDebugEnabled()) {
135      for (Map.Entry<Address, Long> entry : boundaries.entrySet()) {
136        LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
137          entry.getValue());
138      }
139    }
140
141    return boundaries;
142  }
143
144  @Override
145  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
146    List<FileStatus> filteredFiles = new ArrayList<>();
147
148    // all members of this class are null if backup is disabled,
149    // so we cannot filter the files
150    if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
151      LOG.debug("Backup is not enabled. Check your {} setting",
152        BackupRestoreConstants.BACKUP_ENABLE_KEY);
153      return files;
154    }
155
156    Map<Address, Long> serverToPreservationBoundaryTs;
157    try {
158      try (BackupManager backupManager = new BackupManager(conn, getConf())) {
159        serverToPreservationBoundaryTs =
160          serverToPreservationBoundaryTs(backupManager.getBackupHistory(true));
161      }
162    } catch (IOException ex) {
163      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
164        ex.getMessage(), ex);
165      return Collections.emptyList();
166    }
167    for (FileStatus file : files) {
168      if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) {
169        filteredFiles.add(file);
170      }
171    }
172
173    LOG.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files),
174      filteredFiles.size());
175    return filteredFiles;
176  }
177
178  @Override
179  public void setConf(Configuration config) {
180    // If backup is disabled, keep all members null
181    super.setConf(config);
182    if (
183      !config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
184        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
185    ) {
186      LOG.warn("Backup is disabled - allowing all wals to be deleted");
187    }
188  }
189
190  @Override
191  public void stop(String why) {
192    if (!this.stopped) {
193      this.stopped = true;
194      LOG.info("Stopping BackupLogCleaner");
195    }
196  }
197
198  @Override
199  public boolean isStopped() {
200    return this.stopped;
201  }
202
203  protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, Path path) {
204    if (isHMasterWAL(path)) {
205      return true;
206    }
207
208    try {
209      String hostname = BackupUtils.parseHostNameFromLogFile(path);
210      if (hostname == null) {
211        LOG.warn(
212          "Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log",
213          path);
214        return false;
215      }
216      Address walServerAddress = Address.fromString(hostname);
217      long walTimestamp = AbstractFSWALProvider.getTimestamp(path.getName());
218
219      if (!addressToBoundaryTs.containsKey(walServerAddress)) {
220        if (LOG.isDebugEnabled()) {
221          LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}",
222            walServerAddress.getHostName(), path);
223        }
224        return true;
225      }
226
227      Long backupBoundary = addressToBoundaryTs.get(walServerAddress);
228      if (backupBoundary >= walTimestamp) {
229        if (LOG.isDebugEnabled()) {
230          LOG.debug(
231            "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}",
232            walServerAddress.getHostName(), backupBoundary, path);
233        }
234        return true;
235      }
236
237      if (LOG.isDebugEnabled()) {
238        LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}",
239          walServerAddress.getHostName(), backupBoundary, path);
240      }
241    } catch (Exception ex) {
242      LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex);
243      return false;
244    }
245    return false;
246  }
247
248  private static boolean isHMasterWAL(Path path) {
249    String fn = path.getName();
250    return fn.startsWith(WALProcedureStore.LOG_PREFIX)
251      || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
252  }
253}