001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
043
044/**
045 * Implementation of a file cleaner that checks if an hfile is still referenced by backup before
046 * deleting it from hfile archive directory.
047 */
048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
049public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
050  private static final Logger LOG = LoggerFactory.getLogger(BackupHFileCleaner.class);
051  private boolean stopped = false;
052  private boolean aborted;
053  private Configuration conf;
054  private Connection connection;
055  private long prevReadFromBackupTbl = 0, // timestamp of most recent read from backup:system table
056      secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from backup:system table
057  // used by unit test to skip reading backup:system
058  private boolean checkForFullyBackedUpTables = true;
059  private List<TableName> fullyBackedUpTables = null;
060
061  private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
062    Set<String> filenames = new HashSet<>();
063    for (Map<byte[], List<Path>> map : maps) {
064      if (map == null) {
065        continue;
066      }
067
068      for (List<Path> paths : map.values()) {
069        for (Path p : paths) {
070          filenames.add(p.getName());
071        }
072      }
073    }
074    return filenames;
075  }
076
077  private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
078    if (connection == null) {
079      connection = ConnectionFactory.createConnection(conf);
080    }
081    try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
082      Map<byte[], List<Path>>[] res = tbl.readBulkLoadedFiles(null, tableList);
083      secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
084      prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
085      return getFilenameFromBulkLoad(res);
086    }
087  }
088
089  @InterfaceAudience.Private
090  void setCheckForFullyBackedUpTables(boolean b) {
091    checkForFullyBackedUpTables = b;
092  }
093
094  @Override
095  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
096    if (conf == null) {
097      return files;
098    }
099    // obtain the Set of TableName's which have been fully backed up
100    // so that we filter BulkLoad to be returned from server
101    if (checkForFullyBackedUpTables) {
102      if (connection == null) {
103        return files;
104      }
105
106      try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
107        fullyBackedUpTables = new ArrayList<>(tbl.getTablesIncludedInBackups());
108      } catch (IOException ioe) {
109        LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
110        return Collections.emptyList();
111      }
112      Collections.sort(fullyBackedUpTables);
113    }
114    final Set<String> hfileRefs;
115    try {
116      hfileRefs = loadHFileRefs(fullyBackedUpTables);
117    } catch (IOException ioe) {
118      LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
119      return Collections.emptyList();
120    }
121    Iterable<FileStatus> deletables = Iterables.filter(files, file -> {
122      // If the file is recent, be conservative and wait for one more scan of backup:system table
123      if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
124        return false;
125      }
126      String hfile = file.getPath().getName();
127      boolean foundHFileRef = hfileRefs.contains(hfile);
128      return !foundHFileRef;
129    });
130    return deletables;
131  }
132
133  @Override
134  public boolean isFileDeletable(FileStatus fStat) {
135    // work is done in getDeletableFiles()
136    return true;
137  }
138
139  @Override
140  public void setConf(Configuration config) {
141    this.conf = config;
142    this.connection = null;
143    try {
144      this.connection = ConnectionFactory.createConnection(conf);
145    } catch (IOException ioe) {
146      LOG.error("Couldn't establish connection", ioe);
147    }
148  }
149
150  @Override
151  public void stop(String why) {
152    if (this.stopped) {
153      return;
154    }
155    if (this.connection != null) {
156      try {
157        this.connection.close();
158      } catch (IOException ioe) {
159        LOG.debug("Got " + ioe + " when closing connection");
160      }
161    }
162    this.stopped = true;
163  }
164
165  @Override
166  public boolean isStopped() {
167    return this.stopped;
168  }
169
170  @Override
171  public void abort(String why, Throwable e) {
172    LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
173    this.aborted = true;
174    stop(why);
175  }
176
177  @Override
178  public boolean isAborted() {
179    return this.aborted;
180  }
181}