001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.LocatedFileStatus; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.RemoteIterator; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.TableDescriptors; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.backup.HFileArchiver; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.HStore; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 055 056/** 057 * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which 058 * have no active references to) mob files that were referenced from the current RS. 059 */ 060@InterfaceAudience.Private 061public class RSMobFileCleanerChore extends ScheduledChore { 062 063 private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class); 064 private final HRegionServer rs; 065 066 public RSMobFileCleanerChore(HRegionServer rs) { 067 super(rs.getServerName() + "-MobFileCleanerChore", rs, 068 rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 069 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 070 Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 071 MobConstants.DEFAULT_MOB_CLEANER_PERIOD) 072 * ((ThreadLocalRandom.current().nextDouble() + 0.5D))), 073 TimeUnit.SECONDS); 074 // to prevent a load spike on the fs the initial delay is modified by +/- 50% 075 this.rs = rs; 076 } 077 078 public RSMobFileCleanerChore() { 079 this.rs = null; 080 } 081 082 @Override 083 protected void chore() { 084 085 long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 086 MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE); 087 // We check only those MOB files, which creation time is less 088 // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap 089 // gives us full confidence that all corresponding store files will 090 // exist at the time cleaning procedure begins and will be examined. 091 // So, if MOB file creation time is greater than this maxTimeToArchive, 092 // this will be skipped and won't be archived. 093 long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive; 094 095 TableDescriptors htds = rs.getTableDescriptors(); 096 try { 097 FileSystem fs = FileSystem.get(rs.getConfiguration()); 098 099 Map<String, TableDescriptor> map = null; 100 try { 101 map = htds.getAll(); 102 } catch (IOException e) { 103 LOG.error("MobFileCleanerChore failed", e); 104 return; 105 } 106 Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>(); 107 for (TableDescriptor htd : map.values()) { 108 // Now clean obsolete files for a table 109 LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); 110 List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd); 111 if (list.isEmpty()) { 112 // The table is not MOB table, just skip it 113 continue; 114 } 115 List<HRegion> regions = rs.getRegions(htd.getTableName()); 116 for (HRegion region : regions) { 117 for (ColumnFamilyDescriptor hcd : list) { 118 HStore store = region.getStore(hcd.getName()); 119 Collection<HStoreFile> sfs = store.getStorefiles(); 120 Set<String> regionMobs = new HashSet<String>(); 121 Path currentPath = null; 122 try { 123 // collecting referenced MOBs 124 for (HStoreFile sf : sfs) { 125 currentPath = sf.getPath(); 126 byte[] mobRefData = null; 127 byte[] bulkloadMarkerData = null; 128 if (sf.getReader() == null) { 129 synchronized (sf) { 130 boolean needCreateReader = sf.getReader() == null; 131 sf.initReader(); 132 mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); 133 bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); 134 if (needCreateReader) { 135 // close store file to avoid memory leaks 136 sf.closeStoreFile(true); 137 } 138 } 139 } else { 140 mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); 141 bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); 142 } 143 144 if (mobRefData == null) { 145 if (bulkloadMarkerData == null) { 146 LOG.warn( 147 "Found old store file with no MOB_FILE_REFS: {} - " 148 + "can not proceed until all old files will be MOB-compacted.", 149 currentPath); 150 return; 151 } else { 152 LOG.debug("Skipping file without MOB references (bulkloaded file):{}", 153 currentPath); 154 continue; 155 } 156 } 157 // file may or may not have MOB references, but was created by the distributed 158 // mob compaction code. 159 try { 160 SetMultimap<TableName, String> mobs = 161 MobUtils.deserializeMobFileRefs(mobRefData).build(); 162 LOG.debug("Found {} mob references for store={}", mobs.size(), sf); 163 LOG.trace("Specific mob references found for store={} : {}", sf, mobs); 164 regionMobs.addAll(mobs.values()); 165 } catch (RuntimeException exception) { 166 throw new IOException("failure getting mob references for hfile " + sf, 167 exception); 168 } 169 } 170 // collecting files, MOB included currently being written 171 regionMobs.addAll(store.getStoreFilesBeingWritten().stream() 172 .map(path -> path.getName()).collect(Collectors.toList())); 173 174 referencedMOBs 175 .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>()) 176 .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>()) 177 .addAll(regionMobs); 178 179 } catch (FileNotFoundException e) { 180 LOG.warn( 181 "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error", 182 currentPath, e); 183 regionMobs.clear(); 184 continue; 185 } catch (IOException e) { 186 LOG.error("Failed to clean the obsolete mob files for table={}", 187 htd.getTableName().getNameAsString(), e); 188 } 189 } 190 } 191 192 if (LOG.isDebugEnabled()) { 193 LOG.debug("Found: {} active mob refs for table={}", 194 referencedMOBs.values().stream().map(inner -> inner.values()) 195 .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(), 196 htd.getTableName().getNameAsString()); 197 } 198 if (LOG.isTraceEnabled()) { 199 referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream() 200 .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace))); 201 } 202 203 // collect regions referencing MOB files belonging to the current rs 204 Set<String> regionsCovered = new HashSet<>(); 205 referencedMOBs.values().stream() 206 .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet())); 207 208 for (ColumnFamilyDescriptor hcd : list) { 209 List<Path> toArchive = new ArrayList<Path>(); 210 String family = hcd.getNameAsString(); 211 Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family); 212 RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir); 213 while (rit.hasNext()) { 214 LocatedFileStatus lfs = rit.next(); 215 Path p = lfs.getPath(); 216 String[] mobParts = p.getName().split("_"); 217 String regionName = mobParts[mobParts.length - 1]; 218 219 // skip MOB files not belonging to a region assigned to the current rs 220 if (!regionsCovered.contains(regionName)) { 221 LOG.trace("MOB file does not belong to current rs: {}", p); 222 continue; 223 } 224 225 // check active or actively written mob files 226 Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString()); 227 if ( 228 cfMobs != null && cfMobs.get(regionName) != null 229 && cfMobs.get(regionName).contains(p.getName()) 230 ) { 231 LOG.trace("Keeping active MOB file: {}", p); 232 continue; 233 } 234 235 // MOB is not in a list of active references, but it can be too 236 // fresh, skip it in this case 237 long creationTime = fs.getFileStatus(p).getModificationTime(); 238 if (creationTime < maxCreationTimeToArchive) { 239 LOG.trace("Archiving MOB file {} creation time={}", p, 240 (fs.getFileStatus(p).getModificationTime())); 241 toArchive.add(p); 242 } else { 243 LOG.trace("Skipping fresh file: {}. Creation time={}", p, 244 fs.getFileStatus(p).getModificationTime()); 245 } 246 247 } 248 LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", 249 toArchive.size(), htd.getTableName().getNameAsString(), family); 250 archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive); 251 LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), 252 htd.getTableName().getNameAsString(), family); 253 } 254 255 LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); 256 257 } 258 } catch (IOException e) { 259 LOG.error("MOB Cleaner failed when trying to access the file system", e); 260 } 261 } 262 263 /** 264 * Archives the mob files. 265 * @param conf The current configuration. 266 * @param tableName The table name. 267 * @param family The name of the column family. 268 * @param storeFiles The files to be archived. 269 * @throws IOException exception 270 */ 271 public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family, 272 List<Path> storeFiles) throws IOException { 273 274 if (storeFiles.size() == 0) { 275 // nothing to remove 276 LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName, 277 Bytes.toString(family)); 278 return; 279 } 280 Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); 281 FileSystem fs = storeFiles.get(0).getFileSystem(conf); 282 283 for (Path p : storeFiles) { 284 LOG.debug("MOB Cleaner is archiving: {}", p); 285 HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir, 286 family, p); 287 } 288 } 289}