001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.LocatedFileStatus;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.RemoteIterator;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.TableDescriptors;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.HFileArchiver;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
055
056/**
057 * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which
058 * have no active references to) mob files that were referenced from the current RS.
059 */
060@InterfaceAudience.Private
061public class RSMobFileCleanerChore extends ScheduledChore {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class);
064  private final HRegionServer rs;
065
066  public RSMobFileCleanerChore(HRegionServer rs) {
067    super(rs.getServerName() + "-MobFileCleanerChore", rs,
068      rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
069        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
070      Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
071        MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
072        * ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
073      TimeUnit.SECONDS);
074    // to prevent a load spike on the fs the initial delay is modified by +/- 50%
075    this.rs = rs;
076  }
077
078  public RSMobFileCleanerChore() {
079    this.rs = null;
080  }
081
082  @Override
083  protected void chore() {
084
085    long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
086      MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
087    // We check only those MOB files, which creation time is less
088    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
089    // gives us full confidence that all corresponding store files will
090    // exist at the time cleaning procedure begins and will be examined.
091    // So, if MOB file creation time is greater than this maxTimeToArchive,
092    // this will be skipped and won't be archived.
093    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
094
095    TableDescriptors htds = rs.getTableDescriptors();
096    try {
097      FileSystem fs = FileSystem.get(rs.getConfiguration());
098
099      Map<String, TableDescriptor> map = null;
100      try {
101        map = htds.getAll();
102      } catch (IOException e) {
103        LOG.error("MobFileCleanerChore failed", e);
104        return;
105      }
106      Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
107      for (TableDescriptor htd : map.values()) {
108        // Now clean obsolete files for a table
109        LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
110        List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
111        if (list.isEmpty()) {
112          // The table is not MOB table, just skip it
113          continue;
114        }
115        List<HRegion> regions = rs.getRegions(htd.getTableName());
116        for (HRegion region : regions) {
117          for (ColumnFamilyDescriptor hcd : list) {
118            HStore store = region.getStore(hcd.getName());
119            Collection<HStoreFile> sfs = store.getStorefiles();
120            Set<String> regionMobs = new HashSet<String>();
121            Path currentPath = null;
122            try {
123              // collecting referenced MOBs
124              for (HStoreFile sf : sfs) {
125                currentPath = sf.getPath();
126                byte[] mobRefData = null;
127                byte[] bulkloadMarkerData = null;
128                if (sf.getReader() == null) {
129                  synchronized (sf) {
130                    boolean needCreateReader = sf.getReader() == null;
131                    sf.initReader();
132                    mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
133                    bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
134                    if (needCreateReader) {
135                      // close store file to avoid memory leaks
136                      sf.closeStoreFile(true);
137                    }
138                  }
139                } else {
140                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
141                  bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
142                }
143
144                if (mobRefData == null) {
145                  if (bulkloadMarkerData == null) {
146                    LOG.warn(
147                      "Found old store file with no MOB_FILE_REFS: {} - "
148                        + "can not proceed until all old files will be MOB-compacted.",
149                      currentPath);
150                    return;
151                  } else {
152                    LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
153                      currentPath);
154                    continue;
155                  }
156                }
157                // file may or may not have MOB references, but was created by the distributed
158                // mob compaction code.
159                try {
160                  SetMultimap<TableName, String> mobs =
161                    MobUtils.deserializeMobFileRefs(mobRefData).build();
162                  LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
163                  LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
164                  regionMobs.addAll(mobs.values());
165                } catch (RuntimeException exception) {
166                  throw new IOException("failure getting mob references for hfile " + sf,
167                    exception);
168                }
169              }
170              // collecting files, MOB included currently being written
171              regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
172                .map(path -> path.getName()).collect(Collectors.toList()));
173
174              referencedMOBs
175                .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
176                .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
177                .addAll(regionMobs);
178
179            } catch (FileNotFoundException e) {
180              LOG.warn(
181                "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
182                currentPath, e);
183              regionMobs.clear();
184              continue;
185            } catch (IOException e) {
186              LOG.error("Failed to clean the obsolete mob files for table={}",
187                htd.getTableName().getNameAsString(), e);
188            }
189          }
190        }
191
192        if (LOG.isDebugEnabled()) {
193          LOG.debug("Found: {} active mob refs for table={}",
194            referencedMOBs.values().stream().map(inner -> inner.values())
195              .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
196            htd.getTableName().getNameAsString());
197        }
198        if (LOG.isTraceEnabled()) {
199          referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
200            .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
201        }
202
203        // collect regions referencing MOB files belonging to the current rs
204        Set<String> regionsCovered = new HashSet<>();
205        referencedMOBs.values().stream()
206          .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
207
208        for (ColumnFamilyDescriptor hcd : list) {
209          List<Path> toArchive = new ArrayList<Path>();
210          String family = hcd.getNameAsString();
211          Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
212          RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
213          while (rit.hasNext()) {
214            LocatedFileStatus lfs = rit.next();
215            Path p = lfs.getPath();
216            String[] mobParts = p.getName().split("_");
217            String regionName = mobParts[mobParts.length - 1];
218
219            // skip MOB files not belonging to a region assigned to the current rs
220            if (!regionsCovered.contains(regionName)) {
221              LOG.trace("MOB file does not belong to current rs: {}", p);
222              continue;
223            }
224
225            // check active or actively written mob files
226            Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
227            if (
228              cfMobs != null && cfMobs.get(regionName) != null
229                && cfMobs.get(regionName).contains(p.getName())
230            ) {
231              LOG.trace("Keeping active MOB file: {}", p);
232              continue;
233            }
234
235            // MOB is not in a list of active references, but it can be too
236            // fresh, skip it in this case
237            long creationTime = fs.getFileStatus(p).getModificationTime();
238            if (creationTime < maxCreationTimeToArchive) {
239              LOG.trace("Archiving MOB file {} creation time={}", p,
240                (fs.getFileStatus(p).getModificationTime()));
241              toArchive.add(p);
242            } else {
243              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
244                fs.getFileStatus(p).getModificationTime());
245            }
246
247          }
248          LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
249            toArchive.size(), htd.getTableName().getNameAsString(), family);
250          archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
251          LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
252            htd.getTableName().getNameAsString(), family);
253        }
254
255        LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
256
257      }
258    } catch (IOException e) {
259      LOG.error("MOB Cleaner failed when trying to access the file system", e);
260    }
261  }
262
263  /**
264   * Archives the mob files.
265   * @param conf       The current configuration.
266   * @param tableName  The table name.
267   * @param family     The name of the column family.
268   * @param storeFiles The files to be archived.
269   * @throws IOException exception
270   */
271  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
272    List<Path> storeFiles) throws IOException {
273
274    if (storeFiles.size() == 0) {
275      // nothing to remove
276      LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
277        Bytes.toString(family));
278      return;
279    }
280    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
281    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
282
283    for (Path p : storeFiles) {
284      LOG.debug("MOB Cleaner is archiving: {}", p);
285      HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
286        family, p);
287    }
288  }
289}