001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletionService; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorCompletionService; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.locks.ReadWriteLock; 035import java.util.concurrent.locks.ReentrantReadWriteLock; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.io.hfile.BloomFilterMetrics; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; 046import org.apache.hadoop.hbase.regionserver.compactions.Compactor; 047import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 049import org.apache.hadoop.hbase.util.IOExceptionRunnable; 050import org.apache.hadoop.hbase.util.ReflectionUtils; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 057 058/** 059 * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not 060 * all compaction policies, compactors and store file managers are compatible, they are tied 061 * together and replaced together via StoreEngine-s. 062 * <p/> 063 * We expose read write lock methods to upper layer for store operations:<br/> 064 * <ul> 065 * <li>Locked in shared mode when the list of component stores is looked at: 066 * <ul> 067 * <li>all reads/writes to table data</li> 068 * <li>checking for split</li> 069 * </ul> 070 * </li> 071 * <li>Locked in exclusive mode when the list of component stores is modified: 072 * <ul> 073 * <li>closing</li> 074 * <li>completing a compaction</li> 075 * </ul> 076 * </li> 077 * </ul> 078 * <p/> 079 * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As 080 * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM 081 * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but 082 * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory 083 * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of 084 * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} 085 * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, 086 * we will hold write lock when updating it, the lock is also used to protect the normal read/write 087 * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in 088 * memory state is, stripe or not, it does not effect how we track the store files. So consider all 089 * these facts, here we introduce a separated SFT to track the store files. 090 * <p/> 091 * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in 092 * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to 093 * reduce the possible misuse. 094 */ 095@InterfaceAudience.Private 096public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy, 097 C extends Compactor<?>, SFM extends StoreFileManager> { 098 099 private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); 100 101 private static final String READ_FULLY_ON_VALIDATE_KEY = "hbase.hstore.validate.read_fully"; 102 private static final boolean DEFAULT_READ_FULLY_ON_VALIDATE = false; 103 104 protected SF storeFlusher; 105 protected CP compactionPolicy; 106 protected C compactor; 107 protected SFM storeFileManager; 108 109 private final BloomFilterMetrics bloomFilterMetrics = new BloomFilterMetrics(); 110 private Configuration conf; 111 private StoreContext ctx; 112 private RegionCoprocessorHost coprocessorHost; 113 private Function<String, ExecutorService> openStoreFileThreadPoolCreator; 114 private StoreFileTracker storeFileTracker; 115 116 private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); 117 118 /** 119 * The name of the configuration parameter that specifies the class of a store engine that is used 120 * to manage and compact HBase store files. 121 */ 122 public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; 123 124 private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS = 125 DefaultStoreEngine.class; 126 127 /** 128 * Acquire read lock of this store. 129 */ 130 public void readLock() { 131 storeLock.readLock().lock(); 132 } 133 134 /** 135 * Release read lock of this store. 136 */ 137 public void readUnlock() { 138 storeLock.readLock().unlock(); 139 } 140 141 /** 142 * Acquire write lock of this store. 143 */ 144 public void writeLock() { 145 storeLock.writeLock().lock(); 146 } 147 148 /** 149 * Release write lock of this store. 150 */ 151 public void writeUnlock() { 152 storeLock.writeLock().unlock(); 153 } 154 155 /** Returns Compaction policy to use. */ 156 public CompactionPolicy getCompactionPolicy() { 157 return this.compactionPolicy; 158 } 159 160 /** Returns Compactor to use. */ 161 public Compactor<?> getCompactor() { 162 return this.compactor; 163 } 164 165 /** Returns Store file manager to use. */ 166 public StoreFileManager getStoreFileManager() { 167 return this.storeFileManager; 168 } 169 170 /** Returns Store flusher to use. */ 171 StoreFlusher getStoreFlusher() { 172 return this.storeFlusher; 173 } 174 175 private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) { 176 return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(), 177 store.getStoreContext()); 178 } 179 180 /** 181 * @param filesCompacting Files currently compacting 182 * @return whether a compaction selection is possible 183 */ 184 public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); 185 186 /** 187 * Creates an instance of a compaction context specific to this engine. Doesn't actually select or 188 * start a compaction. See CompactionContext class comment. 189 * @return New CompactionContext object. 190 */ 191 public abstract CompactionContext createCompaction() throws IOException; 192 193 /** 194 * Create the StoreEngine's components. 195 */ 196 protected abstract void createComponents(Configuration conf, HStore store, 197 CellComparator cellComparator) throws IOException; 198 199 protected final void createComponentsOnce(Configuration conf, HStore store, 200 CellComparator cellComparator) throws IOException { 201 assert compactor == null && compactionPolicy == null && storeFileManager == null 202 && storeFlusher == null && storeFileTracker == null; 203 createComponents(conf, store, cellComparator); 204 this.conf = conf; 205 this.ctx = store.getStoreContext(); 206 this.coprocessorHost = store.getHRegion().getCoprocessorHost(); 207 this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; 208 this.storeFileTracker = createStoreFileTracker(conf, store); 209 assert compactor != null && compactionPolicy != null && storeFileManager != null 210 && storeFlusher != null; 211 } 212 213 /** 214 * Create a writer for writing new store files. 215 * @return Writer for a new StoreFile 216 */ 217 public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 218 return storeFileTracker.createWriter(params); 219 } 220 221 public HStoreFile createStoreFileAndReader(Path p) throws IOException { 222 StoreFileInfo info = storeFileTracker.getStoreFileInfo(p, ctx.isPrimaryReplicaStore()); 223 return createStoreFileAndReader(info); 224 } 225 226 public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { 227 info.setRegionCoprocessorHost(coprocessorHost); 228 HStoreFile storeFile = new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 229 ctx.getCacheConf(), bloomFilterMetrics); 230 storeFile.initReader(); 231 return storeFile; 232 } 233 234 /** 235 * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive 236 * operation. 237 * @param path the path to the store file 238 * @param isCompaction whether this is called from the context of a compaction 239 */ 240 public void validateStoreFile(Path path, boolean isCompaction) throws IOException { 241 HStoreFile storeFile = null; 242 try { 243 storeFile = createStoreFileAndReader(path); 244 if (conf.getBoolean(READ_FULLY_ON_VALIDATE_KEY, DEFAULT_READ_FULLY_ON_VALIDATE)) { 245 if (storeFile.getFirstKey().isEmpty()) { 246 LOG.debug("'{}=true' but storefile does not contain any data. skipping validation.", 247 READ_FULLY_ON_VALIDATE_KEY); 248 return; 249 } 250 LOG.debug("Validating the store file by reading the first cell from each block : {}", path); 251 StoreFileReader reader = storeFile.getReader(); 252 try (StoreFileScanner scanner = 253 reader.getStoreFileScanner(false, false, isCompaction, Long.MAX_VALUE, 0, false)) { 254 boolean hasNext = scanner.seek(KeyValue.LOWESTKEY); 255 assert hasNext : "StoreFile contains no data"; 256 for (ExtendedCell cell = scanner.next(); cell != null; cell = scanner.next()) { 257 ExtendedCell nextIndexedKey = scanner.getNextIndexedKey(); 258 if (nextIndexedKey == null) { 259 break; 260 } 261 scanner.seek(nextIndexedKey); 262 } 263 } 264 } 265 } catch (IOException e) { 266 LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); 267 throw e; 268 } finally { 269 if (storeFile != null) { 270 storeFile.closeStoreFile(false); 271 } 272 } 273 } 274 275 private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup) 276 throws IOException { 277 if (CollectionUtils.isEmpty(files)) { 278 return Collections.emptyList(); 279 } 280 // initialize the thread pool for opening store files in parallel.. 281 ExecutorService storeFileOpenerThreadPool = 282 openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + ctx.getRegionInfo().getEncodedName() 283 + "-" + ctx.getFamily().getNameAsString()); 284 CompletionService<HStoreFile> completionService = 285 new ExecutorCompletionService<>(storeFileOpenerThreadPool); 286 287 int totalValidStoreFile = 0; 288 for (StoreFileInfo storeFileInfo : files) { 289 // The StoreFileInfo will carry store configuration down to HFile, we need to set it to 290 // our store's CompoundConfiguration here. 291 storeFileInfo.setConf(conf); 292 // open each store file in parallel 293 completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); 294 totalValidStoreFile++; 295 } 296 297 Set<String> compactedStoreFiles = new HashSet<>(); 298 ArrayList<HStoreFile> results = new ArrayList<>(files.size()); 299 IOException ioe = null; 300 try { 301 for (int i = 0; i < totalValidStoreFile; i++) { 302 try { 303 HStoreFile storeFile = completionService.take().get(); 304 if (storeFile != null) { 305 LOG.debug("loaded {}", storeFile); 306 results.add(storeFile); 307 compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); 308 } 309 } catch (InterruptedException e) { 310 if (ioe == null) { 311 ioe = new InterruptedIOException(e.getMessage()); 312 } 313 } catch (ExecutionException e) { 314 if (ioe == null) { 315 ioe = new IOException(e.getCause()); 316 } 317 } 318 } 319 } finally { 320 storeFileOpenerThreadPool.shutdownNow(); 321 } 322 if (ioe != null) { 323 // close StoreFile readers 324 boolean evictOnClose = ctx.getCacheConf() == null || ctx.getCacheConf().shouldEvictOnClose(); 325 for (HStoreFile file : results) { 326 try { 327 if (file != null) { 328 file.closeStoreFile(evictOnClose); 329 } 330 } catch (IOException e) { 331 LOG.warn("Could not close store file {}", file, e); 332 } 333 } 334 throw ioe; 335 } 336 337 // Should not archive the compacted store files when region warmup. See HBASE-22163. 338 if (!warmup) { 339 // Remove the compacted files from result 340 List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size()); 341 for (HStoreFile storeFile : results) { 342 if (compactedStoreFiles.contains(storeFile.getPath().getName())) { 343 LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); 344 storeFile.getReader().close( 345 storeFile.getCacheConf() == null || storeFile.getCacheConf().shouldEvictOnClose()); 346 filesToRemove.add(storeFile); 347 } 348 } 349 results.removeAll(filesToRemove); 350 if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { 351 LOG.debug("Moving the files {} to archive", filesToRemove); 352 ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(), 353 filesToRemove); 354 } 355 } 356 357 return results; 358 } 359 360 public void initialize(boolean warmup) throws IOException { 361 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 362 List<HStoreFile> files = openStoreFiles(fileInfos, warmup); 363 storeFileManager.loadFiles(files); 364 } 365 366 public void refreshStoreFiles() throws IOException { 367 List<StoreFileInfo> fileInfos = storeFileTracker.load(); 368 refreshStoreFilesInternal(fileInfos); 369 } 370 371 public void refreshStoreFiles(Collection<String> newFiles) throws IOException { 372 List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size()); 373 for (String file : newFiles) { 374 storeFiles.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), 375 file, storeFileTracker)); 376 } 377 refreshStoreFilesInternal(storeFiles); 378 } 379 380 /** 381 * Checks the underlying store files, and opens the files that have not been opened, and removes 382 * the store file readers for store files no longer available. Mainly used by secondary region 383 * replicas to keep up to date with the primary region files. 384 */ 385 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { 386 Collection<HStoreFile> currentFiles = storeFileManager.getStoreFiles(); 387 Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles(); 388 if (currentFiles == null) { 389 currentFiles = Collections.emptySet(); 390 } 391 if (newFiles == null) { 392 newFiles = Collections.emptySet(); 393 } 394 if (compactedFiles == null) { 395 compactedFiles = Collections.emptySet(); 396 } 397 398 HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); 399 for (HStoreFile sf : currentFiles) { 400 currentFilesSet.put(sf.getFileInfo(), sf); 401 } 402 HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); 403 for (HStoreFile sf : compactedFiles) { 404 compactedFilesSet.put(sf.getFileInfo(), sf); 405 } 406 407 Set<StoreFileInfo> newFilesSet = new HashSet<>(newFiles); 408 // Exclude the files that have already been compacted 409 newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); 410 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); 411 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); 412 413 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { 414 return; 415 } 416 417 LOG.info("Refreshing store files for {} files to add: {} files to remove: {}", this, 418 toBeAddedFiles, toBeRemovedFiles); 419 420 Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); 421 for (StoreFileInfo sfi : toBeRemovedFiles) { 422 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); 423 } 424 425 // try to open the files 426 List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false); 427 428 // propagate the file changes to the underlying store file manager 429 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { 430 }, () -> { 431 }); // won't throw an exception 432 } 433 434 /** 435 * Commit the given {@code files}. 436 * <p/> 437 * We will move the file into data directory, and open it. 438 * @param files the files want to commit 439 * @param isCompaction whether this is called from the context of a compaction 440 * @param validate whether to validate the store files 441 * @return the committed store files 442 */ 443 public List<HStoreFile> commitStoreFiles(List<Path> files, boolean isCompaction, boolean validate) 444 throws IOException { 445 List<HStoreFile> committedFiles = new ArrayList<>(files.size()); 446 HRegionFileSystem hfs = ctx.getRegionFileSystem(); 447 String familyName = ctx.getFamily().getNameAsString(); 448 Path storeDir = hfs.getStoreDir(familyName); 449 for (Path file : files) { 450 try { 451 if (validate) { 452 validateStoreFile(file, isCompaction); 453 } 454 Path committedPath; 455 // As we want to support writing to data directory directly, here we need to check whether 456 // the store file is already in the right place 457 if (file.getParent() != null && file.getParent().equals(storeDir)) { 458 // already in the right place, skip renaming 459 committedPath = file; 460 } else { 461 // Write-out finished successfully, move into the right spot 462 committedPath = hfs.commitStoreFile(familyName, file); 463 } 464 HStoreFile sf = createStoreFileAndReader(committedPath); 465 committedFiles.add(sf); 466 } catch (IOException e) { 467 LOG.error("Failed to commit store file {}", file, e); 468 // Try to delete the files we have committed before. 469 // It is OK to fail when deleting as leaving the file there does not cause any data 470 // corruption problem. It just introduces some duplicated data which may impact read 471 // performance a little when reading before compaction. 472 for (HStoreFile sf : committedFiles) { 473 Path pathToDelete = sf.getPath(); 474 try { 475 sf.deleteStoreFile(); 476 } catch (IOException deleteEx) { 477 LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, 478 deleteEx); 479 } 480 } 481 throw new IOException("Failed to commit the flush", e); 482 } 483 } 484 return committedFiles; 485 } 486 487 /** 488 * Add the store files to store file manager, and also record it in the store file tracker. 489 * <p/> 490 * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under 491 * the lock protection. Usually this is for clear the memstore snapshot. 492 */ 493 public void addStoreFiles(Collection<HStoreFile> storeFiles, 494 IOExceptionRunnable actionAfterAdding) throws IOException { 495 storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); 496 writeLock(); 497 try { 498 storeFileManager.insertNewFiles(storeFiles); 499 actionAfterAdding.run(); 500 } finally { 501 // We need the lock, as long as we are updating the storeFiles 502 // or changing the memstore. Let us release it before calling 503 // notifyChangeReadersObservers. See HBASE-4485 for a possible 504 // deadlock scenario that could have happened if continue to hold 505 // the lock. 506 writeUnlock(); 507 } 508 } 509 510 public void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 511 Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) 512 throws IOException { 513 storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), 514 StoreUtils.toStoreFileInfo(newFiles)); 515 walMarkerWriter.run(); 516 writeLock(); 517 try { 518 storeFileManager.addCompactionResults(compactedFiles, newFiles); 519 actionUnderLock.run(); 520 } finally { 521 writeUnlock(); 522 } 523 } 524 525 public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) { 526 writeLock(); 527 try { 528 storeFileManager.removeCompactedFiles(compactedFiles); 529 } finally { 530 writeUnlock(); 531 } 532 } 533 534 /** 535 * Create the StoreEngine configured for the given Store. 536 * @param store The store. An unfortunate dependency needed due to it being passed to 537 * coprocessors via the compactor. 538 * @param conf Store configuration. 539 * @param cellComparator CellComparator for storeFileManager. 540 * @return StoreEngine to use. 541 */ 542 public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf, 543 CellComparator cellComparator) throws IOException { 544 String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); 545 try { 546 StoreEngine<?, ?, ?, ?> se = 547 ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); 548 se.createComponentsOnce(conf, store, cellComparator); 549 return se; 550 } catch (Exception e) { 551 throw new IOException("Unable to load configured store engine '" + className + "'", e); 552 } 553 } 554 555 /** 556 * Whether the implementation of the used storefile tracker requires you to write to temp 557 * directory first, i.e, does not allow broken store files under the actual data directory. 558 */ 559 public boolean requireWritingToTmpDirFirst() { 560 return storeFileTracker.requireWritingToTmpDirFirst(); 561 } 562 563 @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", 564 allowedOnPath = ".*/TestHStore.java") 565 ReadWriteLock getLock() { 566 return storeLock; 567 } 568 569 public BloomFilterMetrics getBloomFilterMetrics() { 570 return bloomFilterMetrics; 571 } 572}