001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletionService;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorCompletionService;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.locks.ReadWriteLock;
035import java.util.concurrent.locks.ReentrantReadWriteLock;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
046import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
047import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
049import org.apache.hadoop.hbase.util.IOExceptionRunnable;
050import org.apache.hadoop.hbase.util.ReflectionUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
057
058/**
059 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
060 * all compaction policies, compactors and store file managers are compatible, they are tied
061 * together and replaced together via StoreEngine-s.
062 * <p/>
063 * We expose read write lock methods to upper layer for store operations:<br/>
064 * <ul>
065 * <li>Locked in shared mode when the list of component stores is looked at:
066 * <ul>
067 * <li>all reads/writes to table data</li>
068 * <li>checking for split</li>
069 * </ul>
070 * </li>
071 * <li>Locked in exclusive mode when the list of component stores is modified:
072 * <ul>
073 * <li>closing</li>
074 * <li>completing a compaction</li>
075 * </ul>
076 * </li>
077 * </ul>
078 * <p/>
079 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
080 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
081 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
082 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
083 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
084 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
085 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
086 * we will hold write lock when updating it, the lock is also used to protect the normal read/write
087 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
088 * memory state is, stripe or not, it does not effect how we track the store files. So consider all
089 * these facts, here we introduce a separated SFT to track the store files.
090 * <p/>
091 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
092 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
093 * reduce the possible misuse.
094 */
095@InterfaceAudience.Private
096public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
097  C extends Compactor<?>, SFM extends StoreFileManager> {
098
099  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
100
101  private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully";
102  private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false;
103
104  protected SF storeFlusher;
105  protected CP compactionPolicy;
106  protected C compactor;
107  protected SFM storeFileManager;
108
109  private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics();
110  private Configuration conf;
111  private StoreContext ctx;
112  private RegionCoprocessorHost coprocessorHost;
113  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
114  private StoreFileTracker storeFileTracker;
115
116  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
117
118  /**
119   * The name of the configuration parameter that specifies the class of a store engine that is used
120   * to manage and compact HBase store files.
121   */
122  public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
123
124  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
125    DefaultStoreEngine.class;
126
127  /**
128   * Acquire read lock of this store.
129   */
130  public void readLock() {
131    storeLock.readLock().lock();
132  }
133
134  /**
135   * Release read lock of this store.
136   */
137  public void readUnlock() {
138    storeLock.readLock().unlock();
139  }
140
141  /**
142   * Acquire write lock of this store.
143   */
144  public void writeLock() {
145    storeLock.writeLock().lock();
146  }
147
148  /**
149   * Release write lock of this store.
150   */
151  public void writeUnlock() {
152    storeLock.writeLock().unlock();
153  }
154
155  /** Returns Compaction policy to use. */
156  public CompactionPolicy getCompactionPolicy() {
157    return this.compactionPolicy;
158  }
159
160  /** Returns Compactor to use. */
161  public Compactor<?> getCompactor() {
162    return this.compactor;
163  }
164
165  /** Returns Store file manager to use. */
166  public StoreFileManager getStoreFileManager() {
167    return this.storeFileManager;
168  }
169
170  /** Returns Store flusher to use. */
171  StoreFlusher getStoreFlusher() {
172    return this.storeFlusher;
173  }
174
175  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
176    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
177      store.getStoreContext());
178  }
179
180  /**
181   * @param filesCompacting Files currently compacting
182   * @return whether a compaction selection is possible
183   */
184  public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
185
186  /**
187   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
188   * start a compaction. See CompactionContext class comment.
189   * @return New CompactionContext object.
190   */
191  public abstract CompactionContext createCompaction() throws IOException;
192
193  /**
194   * Create the StoreEngine's components.
195   */
196  protected abstract void createComponents(Configuration conf, HStore store,
197    CellComparator cellComparator) throws IOException;
198
199  protected final void createComponentsOnce(Configuration conf, HStore store,
200    CellComparator cellComparator) throws IOException {
201    assert compactor == null && compactionPolicy == null && storeFileManager == null
202      && storeFlusher == null && storeFileTracker == null;
203    createComponents(conf, store, cellComparator);
204    this.conf = conf;
205    this.ctx = store.getStoreContext();
206    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
207    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
208    this.storeFileTracker = createStoreFileTracker(conf, store);
209    assert compactor != null && compactionPolicy != null && storeFileManager != null
210      && storeFlusher != null;
211  }
212
213  /**
214   * Create a writer for writing new store files.
215   * @return Writer for a new StoreFile
216   */
217  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
218    return storeFileTracker.createWriter(params);
219  }
220
221  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
222    StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore());
223    return createStoreFileAndReader(info);
224  }
225
226  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
227    info.setRegionCoprocessorHost(coprocessorHost);
228    HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(),
229      ctx.getCacheConf(), bloomFilterMetrics);
230    storeFile.initReader();
231    return storeFile;
232  }
233
234  /**
235   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
236   * operation.
237   * @param path         the path to the store file
238   * @param isCompaction whether this is called from the context of a compaction
239   */
240  public void validateStoreFile(Path path, boolean isCompaction) throws IOException {
241    HStoreFile storeFile = null;
242    try {
243      storeFile = createStoreFileAndReader(path);
244      if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) {
245        if (storeFile.getFirstKey().isEmpty()) {
246          LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.",
247            READ_FULLY_ON_VALIDATE_KEY);
248          return;
249        }
250        LOG.debug("Validating the store file by reading the first cell from each block : {}", path);
251        StoreFileReader reader = storeFile.getReader();
252        try (StoreFileScanner scanner =
253          reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) {
254          boolean hasNext = scanner.seek(KeyValue.LOWESTKEY);
255          assert hasNext : "StoreFile contains no data";
256          for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) {
257            ExtendedCell nextIndexedKey = scanner.getNextIndexedKey();
258            if (nextIndexedKey == null) {
259              break;
260            }
261            scanner.seek(nextIndexedKey);
262          }
263        }
264      }
265    } catch (IOException e) {
266      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
267      throw e;
268    } finally {
269      if (storeFile != null) {
270        storeFile.closeStoreFile(false);
271      }
272    }
273  }
274
275  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
276    throws IOException {
277    if (CollectionUtils.isEmpty(files)) {
278      return Collections.emptyList();
279    }
280    // initialize the thread pool for opening store files in parallel..
281    ExecutorService storeFileOpenerThreadPool =
282      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName()
283        + "-" + ctx.getFamily().getNameAsString());
284    CompletionService<HStoreFile> completionService =
285      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
286
287    int totalValidStoreFile = 0;
288    for (StoreFileInfo storeFileInfo : files) {
289      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
290      // our store's CompoundConfiguration here.
291      storeFileInfo.setConf(conf);
292      // open each store file in parallel
293      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
294      totalValidStoreFile++;
295    }
296
297    Set<String> compactedStoreFiles = new HashSet<>();
298    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
299    IOException ioe = null;
300    try {
301      for (int i = 0; i < totalValidStoreFile; i++) {
302        try {
303          HStoreFile storeFile = completionService.take().get();
304          if (storeFile != null) {
305            LOG.debug("loaded {}", storeFile);
306            results.add(storeFile);
307            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
308          }
309        } catch (InterruptedException e) {
310          if (ioe == null) {
311            ioe = new InterruptedIOException(e.getMessage());
312          }
313        } catch (ExecutionException e) {
314          if (ioe == null) {
315            ioe = new IOException(e.getCause());
316          }
317        }
318      }
319    } finally {
320      storeFileOpenerThreadPool.shutdownNow();
321    }
322    if (ioe != null) {
323      // close StoreFile readers
324      boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose();
325      for (HStoreFile file : results) {
326        try {
327          if (file != null) {
328            file.closeStoreFile(evictOnClose);
329          }
330        } catch (IOException e) {
331          LOG.warn("Could not close store file {}", file, e);
332        }
333      }
334      throw ioe;
335    }
336
337    // Should not archive the compacted store files when region warmup. See HBASE-22163.
338    if (!warmup) {
339      // Remove the compacted files from result
340      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
341      for (HStoreFile storeFile : results) {
342        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
343          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
344          storeFile.getReader().close(
345            storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose());
346          filesToRemove.add(storeFile);
347        }
348      }
349      results.removeAll(filesToRemove);
350      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
351        LOG.debug("Moving the files {} to archive", filesToRemove);
352        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
353          filesToRemove);
354      }
355    }
356
357    return results;
358  }
359
360  public void initialize(boolean warmup) throws IOException {
361    List<StoreFileInfo> fileInfos = storeFileTracker.load();
362    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
363    storeFileManager.loadFiles(files);
364  }
365
366  public void refreshStoreFiles() throws IOException {
367    List<StoreFileInfo> fileInfos = storeFileTracker.load();
368    refreshStoreFilesInternal(fileInfos);
369  }
370
371  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
372    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
373    for (String file : newFiles) {
374      storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
375        file, storeFileTracker));
376    }
377    refreshStoreFilesInternal(storeFiles);
378  }
379
380  /**
381   * Checks the underlying store files, and opens the files that have not been opened, and removes
382   * the store file readers for store files no longer available. Mainly used by secondary region
383   * replicas to keep up to date with the primary region files.
384   */
385  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
386    Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles();
387    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
388    if (currentFiles == null) {
389      currentFiles = Collections.emptySet();
390    }
391    if (newFiles == null) {
392      newFiles = Collections.emptySet();
393    }
394    if (compactedFiles == null) {
395      compactedFiles = Collections.emptySet();
396    }
397
398    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
399    for (HStoreFile sf : currentFiles) {
400      currentFilesSet.put(sf.getFileInfo(), sf);
401    }
402    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
403    for (HStoreFile sf : compactedFiles) {
404      compactedFilesSet.put(sf.getFileInfo(), sf);
405    }
406
407    Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
408    // Exclude the files that have already been compacted
409    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
410    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
411    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
412
413    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
414      return;
415    }
416
417    LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this,
418      toBeAddedFiles, toBeRemovedFiles);
419
420    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
421    for (StoreFileInfo sfi : toBeRemovedFiles) {
422      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
423    }
424
425    // try to open the files
426    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
427
428    // propagate the file changes to the underlying store file manager
429    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
430    }, () -> {
431    }); // won't throw an exception
432  }
433
434  /**
435   * Commit the given {@code files}.
436   * <p/>
437   * We will move the file into data directory, and open it.
438   * @param files        the files want to commit
439   * @param isCompaction whether this is called from the context of a compaction
440   * @param validate     whether to validate the store files
441   * @return the committed store files
442   */
443  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate)
444    throws IOException {
445    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
446    HRegionFileSystem hfs = ctx.getRegionFileSystem();
447    String familyName = ctx.getFamily().getNameAsString();
448    Path storeDir = hfs.getStoreDir(familyName);
449    for (Path file : files) {
450      try {
451        if (validate) {
452          validateStoreFile(file, isCompaction);
453        }
454        Path committedPath;
455        // As we want to support writing to data directory directly, here we need to check whether
456        // the store file is already in the right place
457        if (file.getParent() != null && file.getParent().equals(storeDir)) {
458          // already in the right place, skip renaming
459          committedPath = file;
460        } else {
461          // Write-out finished successfully, move into the right spot
462          committedPath = hfs.commitStoreFile(familyName, file);
463        }
464        HStoreFile sf = createStoreFileAndReader(committedPath);
465        committedFiles.add(sf);
466      } catch (IOException e) {
467        LOG.error("Failed to commit store file {}", file, e);
468        // Try to delete the files we have committed before.
469        // It is OK to fail when deleting as leaving the file there does not cause any data
470        // corruption problem. It just introduces some duplicated data which may impact read
471        // performance a little when reading before compaction.
472        for (HStoreFile sf : committedFiles) {
473          Path pathToDelete = sf.getPath();
474          try {
475            sf.deleteStoreFile();
476          } catch (IOException deleteEx) {
477            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
478              deleteEx);
479          }
480        }
481        throw new IOException("Failed to commit the flush", e);
482      }
483    }
484    return committedFiles;
485  }
486
487  /**
488   * Add the store files to store file manager, and also record it in the store file tracker.
489   * <p/>
490   * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
491   * the lock protection. Usually this is for clear the memstore snapshot.
492   */
493  public void addStoreFiles(Collection<HStoreFile> storeFiles,
494    IOExceptionRunnable actionAfterAdding) throws IOException {
495    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
496    writeLock();
497    try {
498      storeFileManager.insertNewFiles(storeFiles);
499      actionAfterAdding.run();
500    } finally {
501      // We need the lock, as long as we are updating the storeFiles
502      // or changing the memstore. Let us release it before calling
503      // notifyChangeReadersObservers. See HBASE-4485 for a possible
504      // deadlock scenario that could have happened if continue to hold
505      // the lock.
506      writeUnlock();
507    }
508  }
509
510  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
511    Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
512    throws IOException {
513    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
514      StoreUtils.toStoreFileInfo(newFiles));
515    walMarkerWriter.run();
516    writeLock();
517    try {
518      storeFileManager.addCompactionResults(compactedFiles, newFiles);
519      actionUnderLock.run();
520    } finally {
521      writeUnlock();
522    }
523  }
524
525  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
526    writeLock();
527    try {
528      storeFileManager.removeCompactedFiles(compactedFiles);
529    } finally {
530      writeUnlock();
531    }
532  }
533
534  /**
535   * Create the StoreEngine configured for the given Store.
536   * @param store          The store. An unfortunate dependency needed due to it being passed to
537   *                       coprocessors via the compactor.
538   * @param conf           Store configuration.
539   * @param cellComparator CellComparator for storeFileManager.
540   * @return StoreEngine to use.
541   */
542  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
543    CellComparator cellComparator) throws IOException {
544    String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
545    try {
546      StoreEngine<?, ?, ?, ?> se =
547        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
548      se.createComponentsOnce(conf, store, cellComparator);
549      return se;
550    } catch (Exception e) {
551      throw new IOException("Unable to load configured store engine '" + className + "'", e);
552    }
553  }
554
555  /**
556   * Whether the implementation of the used storefile tracker requires you to write to temp
557   * directory first, i.e, does not allow broken store files under the actual data directory.
558   */
559  public boolean requireWritingToTmpDirFirst() {
560    return storeFileTracker.requireWritingToTmpDirFirst();
561  }
562
563  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
564      allowedOnPath = ".*/TestHStore.java")
565  ReadWriteLock getLock() {
566    return storeLock;
567  }
568
569  public BloomFilterMetrics getBloomFilterMetrics() {
570    return bloomFilterMetrics;
571  }
572}