001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Map; 023import java.util.Set; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.master.HMaster; 028import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 029import org.apache.hadoop.hbase.replication.ReplicationException; 030import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 031import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 039import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 040import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 041 042/** 043 * Implementation of a log cleaner that checks if a log is still scheduled for replication before 044 * deleting it when its TTL is over. 045 */ 046@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 047public class ReplicationLogCleaner extends BaseLogCleanerDelegate { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); 049 private ZKWatcher zkw = null; 050 private boolean shareZK = false; 051 private ReplicationQueueStorage queueStorage; 052 private boolean stopped = false; 053 private Set<String> wals; 054 private long readZKTimestamp = 0; 055 056 @Override 057 public void preClean() { 058 readZKTimestamp = EnvironmentEdgeManager.currentTime(); 059 try { 060 // The concurrently created new WALs may not be included in the return list, 061 // but they won't be deleted because they're not in the checking set. 062 wals = queueStorage.getAllWALs(); 063 } catch (ReplicationException e) { 064 LOG.warn("Failed to read zookeeper, skipping checking deletable files"); 065 wals = null; 066 } 067 } 068 069 @Override 070 public void postClean() { 071 // release memory 072 wals = null; 073 } 074 075 @Override 076 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 077 // all members of this class are null if replication is disabled, 078 // so we cannot filter the files 079 if (this.getConf() == null) { 080 return files; 081 } 082 083 if (wals == null) { 084 return Collections.emptyList(); 085 } 086 return Iterables.filter(files, new Predicate<FileStatus>() { 087 @Override 088 public boolean apply(FileStatus file) { 089 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 090 // the guava Predicate. 091 if (file == null) { 092 return false; 093 } 094 String wal = file.getPath().getName(); 095 boolean logInReplicationQueue = wals.contains(wal); 096 if (logInReplicationQueue) { 097 LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); 098 } 099 return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); 100 } 101 }); 102 } 103 104 @Override 105 public void init(Map<String, Object> params) { 106 super.init(params); 107 try { 108 if (MapUtils.isNotEmpty(params)) { 109 Object master = params.get(HMaster.MASTER); 110 if (master != null && master instanceof HMaster) { 111 zkw = ((HMaster) master).getZooKeeper(); 112 shareZK = true; 113 } 114 } 115 if (zkw == null) { 116 zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null); 117 } 118 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); 119 } catch (IOException e) { 120 LOG.error("Error while configuring " + this.getClass().getName(), e); 121 } 122 } 123 124 @InterfaceAudience.Private 125 public void setConf(Configuration conf, ZKWatcher zk) { 126 super.setConf(conf); 127 try { 128 this.zkw = zk; 129 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); 130 } catch (Exception e) { 131 LOG.error("Error while configuring " + this.getClass().getName(), e); 132 } 133 } 134 135 @Override 136 public void stop(String why) { 137 if (this.stopped) return; 138 this.stopped = true; 139 if (!shareZK && this.zkw != null) { 140 LOG.info("Stopping " + this.zkw); 141 this.zkw.close(); 142 } 143 } 144 145 @Override 146 public boolean isStopped() { 147 return this.stopped; 148 } 149}