001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 022import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 023import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.IdentityHashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.hfile.HFile; 042import org.apache.hadoop.hbase.io.hfile.HFileInfo; 043import org.apache.hadoop.hbase.regionserver.CellSink; 044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 045import org.apache.hadoop.hbase.regionserver.HStore; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.regionserver.InternalScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.ScanType; 050import org.apache.hadoop.hbase.regionserver.ScannerContext; 051import org.apache.hadoop.hbase.regionserver.Shipper; 052import org.apache.hadoop.hbase.regionserver.ShipperListener; 053import org.apache.hadoop.hbase.regionserver.StoreFileReader; 054import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 055import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 056import org.apache.hadoop.hbase.regionserver.StoreScanner; 057import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 060import org.apache.hadoop.hbase.security.User; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 069 070/** 071 * A compactor is a compaction algorithm associated a given policy. Base class also contains 072 * reusable parts for implementing compactors (what is common and what isn't is evolving). 073 * <p> 074 * Compactions might be concurrent against a given store and the Compactor is shared among them. Do 075 * not put mutable state into class fields. All Compactor class fields should be final or 076 * effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. 077 */ 078@InterfaceAudience.Private 079public abstract class Compactor<T extends CellSink> { 080 private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); 081 protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; 082 protected final Configuration conf; 083 protected final HStore store; 084 protected final int compactionKVMax; 085 protected final long compactScannerSizeLimit; 086 protected final Compression.Algorithm majorCompactionCompression; 087 protected final Compression.Algorithm minorCompactionCompression; 088 089 /** specify how many days to keep MVCC values during major compaction **/ 090 protected int keepSeqIdPeriod; 091 092 // Configs that drive whether we drop page cache behind compactions 093 protected static final String MAJOR_COMPACTION_DROP_CACHE = 094 "hbase.regionserver.majorcompaction.pagecache.drop"; 095 protected static final String MINOR_COMPACTION_DROP_CACHE = 096 "hbase.regionserver.minorcompaction.pagecache.drop"; 097 098 protected final boolean dropCacheMajor; 099 protected final boolean dropCacheMinor; 100 101 // We track progress per request using the CompactionRequestImpl identity as key. 102 // completeCompaction() cleans up this state. 103 private final Set<CompactionProgress> progressSet = 104 Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); 105 106 // TODO: depending on Store is not good but, realistically, all compactors currently do. 107 Compactor(Configuration conf, HStore store) { 108 this.conf = conf; 109 this.store = store; 110 this.compactionKVMax = 111 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 112 this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 113 HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT); 114 this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 115 ? Compression.Algorithm.NONE 116 : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); 117 this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) 118 ? Compression.Algorithm.NONE 119 : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType(); 120 this.keepSeqIdPeriod = 121 Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD), 122 HConstants.MIN_KEEP_SEQID_PERIOD); 123 this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); 124 this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); 125 } 126 127 protected interface CellSinkFactory<S> { 128 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, 129 Consumer<Path> writerCreationTracker) throws IOException; 130 } 131 132 /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ 133 protected static class FileDetails { 134 /** Maximum key count after compaction (for blooms) */ 135 public long maxKeyCount = 0; 136 /** Earliest put timestamp if major compaction */ 137 public long earliestPutTs = HConstants.LATEST_TIMESTAMP; 138 /** Latest put timestamp */ 139 public long latestPutTs = HConstants.LATEST_TIMESTAMP; 140 /** The last key in the files we're compacting. */ 141 public long maxSeqId = 0; 142 /** Latest memstore read point found in any of the involved files */ 143 public long maxMVCCReadpoint = 0; 144 /** Max tags length **/ 145 public int maxTagsLength = 0; 146 /** Min SeqId to keep during a major compaction **/ 147 public long minSeqIdToKeep = 0; 148 /** Total size of the compacted files **/ 149 private long totalCompactedFilesSize = 0; 150 } 151 152 /** 153 * Extracts some details about the files to compact that are commonly needed by compactors. 154 * @param filesToCompact Files. 155 * @param allFiles Whether all files are included for compaction 156 * @parma major If major compaction 157 * @return The result. 158 */ 159 private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles, 160 boolean major) throws IOException { 161 FileDetails fd = new FileDetails(); 162 long oldestHFileTimestampToKeepMVCC = 163 EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); 164 165 for (HStoreFile file : filesToCompact) { 166 if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { 167 // when isAllFiles is true, all files are compacted so we can calculate the smallest 168 // MVCC value to keep 169 if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) { 170 fd.minSeqIdToKeep = file.getMaxMemStoreTS(); 171 } 172 } 173 long seqNum = file.getMaxSequenceId(); 174 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); 175 StoreFileReader r = file.getReader(); 176 if (r == null) { 177 LOG.warn("Null reader for " + file.getPath()); 178 continue; 179 } 180 // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized 181 // blooms can cause progress to be miscalculated or if the user switches bloom 182 // type (e.g. from ROW to ROWCOL) 183 long keyCount = r.getEntries(); 184 fd.maxKeyCount += keyCount; 185 // calculate the latest MVCC readpoint in any of the involved store files 186 Map<byte[], byte[]> fileInfo = r.loadFileInfo(); 187 188 // calculate the total size of the compacted files 189 fd.totalCompactedFilesSize += r.length(); 190 191 byte[] tmp = null; 192 // Get and set the real MVCCReadpoint for bulk loaded files, which is the 193 // SeqId number. 194 if (r.isBulkLoaded()) { 195 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); 196 } else { 197 tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); 198 if (tmp != null) { 199 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); 200 } 201 } 202 tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN); 203 if (tmp != null) { 204 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp)); 205 } 206 // If required, calculate the earliest put timestamp of all involved storefiles. 207 // This is used to remove family delete marker during compaction. 208 long earliestPutTs = 0; 209 if (allFiles) { 210 tmp = fileInfo.get(EARLIEST_PUT_TS); 211 if (tmp == null) { 212 // There's a file with no information, must be an old one 213 // assume we have very old puts 214 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP; 215 } else { 216 earliestPutTs = Bytes.toLong(tmp); 217 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); 218 } 219 } 220 tmp = fileInfo.get(TIMERANGE_KEY); 221 fd.latestPutTs = 222 tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); 223 LOG.debug( 224 "Compacting {}, keycount={}, bloomtype={}, size={}, " 225 + "encoding={}, compression={}, seqNum={}{}", 226 (file.getPath() == null ? null : file.getPath().getName()), keyCount, 227 r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1), 228 r.getHFileReader().getDataBlockEncoding(), 229 major ? majorCompactionCompression : minorCompactionCompression, seqNum, 230 (allFiles ? ", earliestPutTs=" + earliestPutTs : "")); 231 } 232 return fd; 233 } 234 235 /** 236 * Creates file scanners for compaction. 237 * @param filesToCompact Files. 238 * @return Scanners. 239 */ 240 private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact, 241 long smallestReadPoint, boolean useDropBehind) throws IOException { 242 return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, 243 smallestReadPoint); 244 } 245 246 private long getSmallestReadPoint() { 247 return store.getSmallestReadPoint(); 248 } 249 250 protected interface InternalScannerFactory { 251 252 ScanType getScanType(CompactionRequestImpl request); 253 254 InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 255 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; 256 } 257 258 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { 259 260 @Override 261 public ScanType getScanType(CompactionRequestImpl request) { 262 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 263 } 264 265 @Override 266 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 267 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 268 return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 269 fd.earliestPutTs); 270 } 271 }; 272 273 protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, 274 boolean major, Consumer<Path> writerCreationTracker) { 275 return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) 276 .compression(major ? majorCompactionCompression : minorCompactionCompression) 277 .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) 278 .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) 279 .totalCompactedFilesSize(fd.totalCompactedFilesSize) 280 .writerCreationTracker(writerCreationTracker); 281 } 282 283 /** 284 * Creates a writer for a new file. 285 * @param fd The file details. 286 * @return Writer for a new StoreFile 287 * @throws IOException if creation failed 288 */ 289 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 290 boolean major, Consumer<Path> writerCreationTracker) throws IOException { 291 // When all MVCC readpoints are 0, don't write them. 292 // See HBASE-8166, HBASE-12600, and HBASE-13389. 293 return store.getStoreEngine() 294 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); 295 } 296 297 protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, 298 String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker) 299 throws IOException { 300 return store.getStoreEngine() 301 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 302 .fileStoragePolicy(fileStoragePolicy)); 303 } 304 305 private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 306 User user) throws IOException { 307 if (store.getCoprocessorHost() == null) { 308 return store.getScanInfo(); 309 } 310 return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), 311 request, user); 312 } 313 314 /** 315 * Calls coprocessor, if any, to create scanners - after normal scanner creation. 316 * @param request Compaction request. 317 * @param scanType Scan type. 318 * @param scanner The default scanner created for compaction. 319 * @return Scanner scanner to use (usually the default); null if compaction should not proceed. 320 */ 321 private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, 322 InternalScanner scanner, User user) throws IOException { 323 if (store.getCoprocessorHost() == null) { 324 return scanner; 325 } 326 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), 327 request, user); 328 } 329 330 protected final List<Path> compact(final CompactionRequestImpl request, 331 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, 332 ThroughputController throughputController, User user) throws IOException { 333 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); 334 335 // Find the smallest read point across all the Scanners. 336 long smallestReadPoint = getSmallestReadPoint(); 337 338 boolean dropCache; 339 if (request.isMajor() || request.isAllFiles()) { 340 dropCache = this.dropCacheMajor; 341 } else { 342 dropCache = this.dropCacheMinor; 343 } 344 345 InternalScanner scanner = null; 346 boolean finished = false; 347 List<StoreFileScanner> scanners = 348 createFileScanners(request.getFiles(), smallestReadPoint, dropCache); 349 T writer = null; 350 CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); 351 progressSet.add(progress); 352 try { 353 /* Include deletes, unless we are doing a major compaction */ 354 ScanType scanType = scannerFactory.getScanType(request); 355 ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); 356 scanner = postCompactScannerOpen(request, scanType, 357 scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); 358 boolean cleanSeqId = false; 359 if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { 360 // For mvcc-sensitive family, we never set mvcc to 0. 361 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); 362 cleanSeqId = true; 363 } 364 writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(), 365 request.getWriterCreationTracker()); 366 finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, 367 throughputController, request, progress); 368 if (!finished) { 369 throw new InterruptedIOException("Aborting compaction of store " + store + " in region " 370 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); 371 } 372 } finally { 373 // createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile 374 // reader encounters java.io.IOException: Invalid HFile block magic: 375 // \x00\x00\x00\x00\x00\x00\x00\x00 376 // and then scanner will be null, but scanners for all the hfiles should be closed, 377 // or else we will find leak of ESTABLISHED sockets. 378 if (scanner == null) { 379 for (StoreFileScanner sfs : scanners) { 380 sfs.close(); 381 } 382 } else { 383 Closeables.close(scanner, true); 384 } 385 if (!finished) { 386 if (writer != null) { 387 abortWriter(writer); 388 } 389 } else { 390 store.updateCompactedMetrics(request.isMajor(), progress); 391 } 392 progressSet.remove(progress); 393 } 394 assert finished : "We should have exited the method on all error paths"; 395 assert writer != null : "Writer should be non-null if no error"; 396 return commitWriter(writer, fd, request); 397 } 398 399 protected abstract List<Path> commitWriter(T writer, FileDetails fd, 400 CompactionRequestImpl request) throws IOException; 401 402 protected abstract void abortWriter(T writer) throws IOException; 403 404 /** 405 * Performs the compaction. 406 * @param fd FileDetails of cell sink writer 407 * @param scanner Where to read from. 408 * @param writer Where to write to. 409 * @param smallestReadPoint Smallest read point. 410 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 411 * smallestReadPoint 412 * @param request compaction request. 413 * @param progress Progress reporter. 414 * @return Whether compaction ended; false if it was interrupted for some reason. 415 */ 416 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 417 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 418 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 419 assert writer instanceof ShipperListener; 420 long bytesWrittenProgressForLog = 0; 421 long bytesWrittenProgressForShippedCall = 0; 422 // Since scanner.next() can return 'false' but still be delivering data, 423 // we have to use a do/while loop. 424 List<Cell> cells = new ArrayList<>(); 425 long currentTime = EnvironmentEdgeManager.currentTime(); 426 long lastMillis = 0; 427 if (LOG.isDebugEnabled()) { 428 lastMillis = currentTime; 429 } 430 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 431 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 432 long now = 0; 433 boolean hasMore; 434 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 435 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 436 compactScannerSizeLimit) 437 .build(); 438 439 throughputController.start(compactionName); 440 Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; 441 long shippedCallSizeLimit = 442 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 443 try { 444 do { 445 hasMore = scanner.next(cells, scannerContext); 446 currentTime = EnvironmentEdgeManager.currentTime(); 447 if (LOG.isDebugEnabled()) { 448 now = currentTime; 449 } 450 if (closeChecker.isTimeLimit(store, currentTime)) { 451 progress.cancel(); 452 return false; 453 } 454 // output to writer: 455 Cell lastCleanCell = null; 456 long lastCleanCellSeqId = 0; 457 for (Cell c : cells) { 458 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { 459 lastCleanCell = c; 460 lastCleanCellSeqId = c.getSequenceId(); 461 PrivateCellUtil.setSequenceId(c, 0); 462 } else { 463 lastCleanCell = null; 464 lastCleanCellSeqId = 0; 465 } 466 writer.append(c); 467 int len = c.getSerializedSize(); 468 ++progress.currentCompactedKVs; 469 progress.totalCompactedSize += len; 470 bytesWrittenProgressForShippedCall += len; 471 if (LOG.isDebugEnabled()) { 472 bytesWrittenProgressForLog += len; 473 } 474 throughputController.control(compactionName, len); 475 if (closeChecker.isSizeLimit(store, len)) { 476 progress.cancel(); 477 return false; 478 } 479 } 480 if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 481 if (lastCleanCell != null) { 482 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. 483 // ShipperListener will do a clone of the last cells it refer, so need to set back 484 // sequence id before ShipperListener.beforeShipped 485 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 486 } 487 // Clone the cells that are in the writer so that they are freed of references, 488 // if they are holding any. 489 ((ShipperListener) writer).beforeShipped(); 490 // The SHARED block references, being read for compaction, will be kept in prevBlocks 491 // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells 492 // being returned to client, we will call shipped() which can clear this list. Here by 493 // we are doing the similar thing. In between the compaction (after every N cells 494 // written with collective size of 'shippedCallSizeLimit') we will call shipped which 495 // may clear prevBlocks list. 496 shipper.shipped(); 497 bytesWrittenProgressForShippedCall = 0; 498 } 499 if (lastCleanCell != null) { 500 // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly 501 PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); 502 } 503 // Log the progress of long running compactions every minute if 504 // logging at DEBUG level 505 if (LOG.isDebugEnabled()) { 506 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 507 String rate = String.format("%.2f", 508 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 509 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 510 compactionName, progress, rate, throughputController); 511 lastMillis = now; 512 bytesWrittenProgressForLog = 0; 513 } 514 } 515 cells.clear(); 516 } while (hasMore); 517 } catch (InterruptedException e) { 518 progress.cancel(); 519 throw new InterruptedIOException( 520 "Interrupted while control throughput of compacting " + compactionName); 521 } finally { 522 // Clone last cell in the final because writer will append last cell when committing. If 523 // don't clone here and once the scanner get closed, then the memory of last cell will be 524 // released. (HBASE-22582) 525 ((ShipperListener) writer).beforeShipped(); 526 throughputController.finish(compactionName); 527 } 528 progress.complete(); 529 return true; 530 } 531 532 /** 533 * @param store store 534 * @param scanners Store file scanners. 535 * @param scanType Scan type. 536 * @param smallestReadPoint Smallest MVCC read point. 537 * @param earliestPutTs Earliest put across all files. 538 * @return A compaction scanner. 539 */ 540 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 541 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) 542 throws IOException { 543 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); 544 } 545 546 /** 547 * @param store The store. 548 * @param scanners Store file scanners. 549 * @param smallestReadPoint Smallest MVCC read point. 550 * @param earliestPutTs Earliest put across all files. 551 * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. 552 * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. 553 * @return A compaction scanner. 554 */ 555 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 556 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 557 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 558 return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, 559 dropDeletesFromRow, dropDeletesToRow); 560 } 561 562 /** 563 * Return the aggregate progress for all currently active compactions. 564 */ 565 public CompactionProgress getProgress() { 566 synchronized (progressSet) { 567 long totalCompactingKVs = 0; 568 long currentCompactedKVs = 0; 569 long totalCompactedSize = 0; 570 for (CompactionProgress progress : progressSet) { 571 totalCompactingKVs += progress.totalCompactingKVs; 572 currentCompactedKVs += progress.currentCompactedKVs; 573 totalCompactedSize += progress.totalCompactedSize; 574 } 575 CompactionProgress result = new CompactionProgress(totalCompactingKVs); 576 result.currentCompactedKVs = currentCompactedKVs; 577 result.totalCompactedSize = totalCompactedSize; 578 return result; 579 } 580 } 581 582 public boolean isCompacting() { 583 return !progressSet.isEmpty(); 584 } 585}