001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.LocatedFileStatus; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.RemoteIterator; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.TableDescriptors; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.backup.HFileArchiver; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.HStore; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 055 056/** 057 * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which 058 * have no active references to) mob files that were referenced from the current RS. 059 */ 060@InterfaceAudience.Private 061public class RSMobFileCleanerChore extends ScheduledChore { 062 063 private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class); 064 private final HRegionServer rs; 065 066 public RSMobFileCleanerChore(HRegionServer rs) { 067 super(rs.getServerName() + "-MobFileCleanerChore", rs, 068 rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 069 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 070 Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 071 MobConstants.DEFAULT_MOB_CLEANER_PERIOD) 072 * ((ThreadLocalRandom.current().nextDouble() + 0.5D))), 073 TimeUnit.SECONDS); 074 // to prevent a load spike on the fs the initial delay is modified by +/- 50% 075 this.rs = rs; 076 } 077 078 public RSMobFileCleanerChore() { 079 this.rs = null; 080 } 081 082 @Override 083 protected void chore() { 084 085 long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 086 MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE); 087 // We check only those MOB files, which creation time is less 088 // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap 089 // gives us full confidence that all corresponding store files will 090 // exist at the time cleaning procedure begins and will be examined. 091 // So, if MOB file creation time is greater than this maxTimeToArchive, 092 // this will be skipped and won't be archived. 093 long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive; 094 095 TableDescriptors htds = rs.getTableDescriptors(); 096 try { 097 FileSystem fs = FileSystem.get(rs.getConfiguration()); 098 099 Map<String, TableDescriptor> map = null; 100 try { 101 map = htds.getAll(); 102 } catch (IOException e) { 103 LOG.error("MobFileCleanerChore failed", e); 104 return; 105 } 106 Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>(); 107 for (TableDescriptor htd : map.values()) { 108 // Now clean obsolete files for a table 109 LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); 110 List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd); 111 List<HRegion> regions = rs.getRegions(htd.getTableName()); 112 for (HRegion region : regions) { 113 for (ColumnFamilyDescriptor hcd : list) { 114 HStore store = region.getStore(hcd.getName()); 115 Collection<HStoreFile> sfs = store.getStorefiles(); 116 Set<String> regionMobs = new HashSet<String>(); 117 Path currentPath = null; 118 try { 119 // collectinng referenced MOBs 120 for (HStoreFile sf : sfs) { 121 currentPath = sf.getPath(); 122 sf.initReader(); 123 byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); 124 byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); 125 // close store file to avoid memory leaks 126 sf.closeStoreFile(true); 127 if (mobRefData == null) { 128 if (bulkloadMarkerData == null) { 129 LOG.warn( 130 "Found old store file with no MOB_FILE_REFS: {} - " 131 + "can not proceed until all old files will be MOB-compacted.", 132 currentPath); 133 return; 134 } else { 135 LOG.debug("Skipping file without MOB references (bulkloaded file):{}", 136 currentPath); 137 continue; 138 } 139 } 140 // file may or may not have MOB references, but was created by the distributed 141 // mob compaction code. 142 try { 143 SetMultimap<TableName, String> mobs = 144 MobUtils.deserializeMobFileRefs(mobRefData).build(); 145 LOG.debug("Found {} mob references for store={}", mobs.size(), sf); 146 LOG.trace("Specific mob references found for store={} : {}", sf, mobs); 147 regionMobs.addAll(mobs.values()); 148 } catch (RuntimeException exception) { 149 throw new IOException("failure getting mob references for hfile " + sf, 150 exception); 151 } 152 } 153 // collecting files, MOB included currently being written 154 regionMobs.addAll(store.getStoreFilesBeingWritten().stream() 155 .map(path -> path.getName()).collect(Collectors.toList())); 156 157 referencedMOBs 158 .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>()) 159 .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>()) 160 .addAll(regionMobs); 161 162 } catch (FileNotFoundException e) { 163 LOG.warn( 164 "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error", 165 currentPath, e); 166 regionMobs.clear(); 167 continue; 168 } catch (IOException e) { 169 LOG.error("Failed to clean the obsolete mob files for table={}", 170 htd.getTableName().getNameAsString(), e); 171 } 172 } 173 } 174 175 if (LOG.isDebugEnabled()) { 176 LOG.debug("Found: {} active mob refs for table={}", 177 referencedMOBs.values().stream().map(inner -> inner.values()) 178 .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(), 179 htd.getTableName().getNameAsString()); 180 } 181 if (LOG.isTraceEnabled()) { 182 referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream() 183 .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace))); 184 } 185 186 // collect regions referencing MOB files belonging to the current rs 187 Set<String> regionsCovered = new HashSet<>(); 188 referencedMOBs.values().stream() 189 .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet())); 190 191 for (ColumnFamilyDescriptor hcd : list) { 192 List<Path> toArchive = new ArrayList<Path>(); 193 String family = hcd.getNameAsString(); 194 Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family); 195 RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir); 196 while (rit.hasNext()) { 197 LocatedFileStatus lfs = rit.next(); 198 Path p = lfs.getPath(); 199 String[] mobParts = p.getName().split("_"); 200 String regionName = mobParts[mobParts.length - 1]; 201 202 // skip MOB files not belonging to a region assigned to the current rs 203 if (!regionsCovered.contains(regionName)) { 204 LOG.trace("MOB file does not belong to current rs: {}", p); 205 continue; 206 } 207 208 // check active or actively written mob files 209 Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString()); 210 if ( 211 cfMobs != null && cfMobs.get(regionName) != null 212 && cfMobs.get(regionName).contains(p.getName()) 213 ) { 214 LOG.trace("Keeping active MOB file: {}", p); 215 continue; 216 } 217 218 // MOB is not in a list of active references, but it can be too 219 // fresh, skip it in this case 220 long creationTime = fs.getFileStatus(p).getModificationTime(); 221 if (creationTime < maxCreationTimeToArchive) { 222 LOG.trace("Archiving MOB file {} creation time={}", p, 223 (fs.getFileStatus(p).getModificationTime())); 224 toArchive.add(p); 225 } else { 226 LOG.trace("Skipping fresh file: {}. Creation time={}", p, 227 fs.getFileStatus(p).getModificationTime()); 228 } 229 230 } 231 LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", 232 toArchive.size(), htd.getTableName().getNameAsString(), family); 233 archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive); 234 LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), 235 htd.getTableName().getNameAsString(), family); 236 } 237 238 LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); 239 240 } 241 } catch (IOException e) { 242 LOG.error("MOB Cleaner failed when trying to access the file system", e); 243 } 244 } 245 246 /** 247 * Archives the mob files. 248 * @param conf The current configuration. 249 * @param tableName The table name. 250 * @param family The name of the column family. 251 * @param storeFiles The files to be archived. 252 * @throws IOException exception 253 */ 254 public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family, 255 List<Path> storeFiles) throws IOException { 256 257 if (storeFiles.size() == 0) { 258 // nothing to remove 259 LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName, 260 Bytes.toString(family)); 261 return; 262 } 263 Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); 264 FileSystem fs = storeFiles.get(0).getFileSystem(conf); 265 266 for (Path p : storeFiles) { 267 LOG.debug("MOB Cleaner is archiving: {}", p); 268 HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir, 269 family, p); 270 } 271 } 272}