001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 021import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 022 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map.Entry; 031import java.util.Optional; 032import java.util.function.Consumer; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.PrivateCellUtil; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.regionserver.CellSink; 043import org.apache.hadoop.hbase.regionserver.HMobStore; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.HStoreFile; 046import org.apache.hadoop.hbase.regionserver.InternalScanner; 047import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.ScanType; 050import org.apache.hadoop.hbase.regionserver.ScannerContext; 051import org.apache.hadoop.hbase.regionserver.ShipperListener; 052import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 053import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 054import org.apache.hadoop.hbase.regionserver.StoreScanner; 055import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 058import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; 070import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 072 073/** 074 * Compact passed set of files in the mob-enabled column family. 075 */ 076@InterfaceAudience.Private 077public class DefaultMobStoreCompactor extends DefaultCompactor { 078 079 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 080 protected long mobSizeThreshold; 081 protected HMobStore mobStore; 082 protected boolean ioOptimizedMode = false; 083 084 /* 085 * MOB file reference set thread local variable. It contains set of a MOB file names, which newly 086 * compacted store file has references to. This variable is populated during compaction and the 087 * content of it is written into meta section of a newly created store file at the final step of 088 * compaction process. 089 */ 090 static ThreadLocal<SetMultimap<TableName, String>> mobRefSet = 091 ThreadLocal.withInitial(HashMultimap::create); 092 093 /* 094 * Is it user or system-originated request. 095 */ 096 097 static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() { 098 @Override 099 protected Boolean initialValue() { 100 return Boolean.FALSE; 101 } 102 }; 103 104 /* 105 * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file 106 * (pre-distributed compaction). This means that migration has not been completed yet. During data 107 * migration (upgrade) process only general compaction is allowed. 108 */ 109 110 static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() { 111 112 @Override 113 protected Boolean initialValue() { 114 return Boolean.FALSE; 115 } 116 }; 117 118 /* 119 * Map : MOB file name - file length Can be expensive for large amount of MOB files. 120 */ 121 static ThreadLocal<HashMap<String, Long>> mobLengthMap = 122 new ThreadLocal<HashMap<String, Long>>() { 123 @Override 124 protected HashMap<String, Long> initialValue() { 125 return new HashMap<String, Long>(); 126 } 127 }; 128 129 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 130 131 @Override 132 public ScanType getScanType(CompactionRequestImpl request) { 133 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 134 } 135 136 @Override 137 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 138 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 139 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 140 fd.earliestPutTs); 141 } 142 }; 143 144 private final CellSinkFactory<StoreFileWriter> writerFactory = 145 new CellSinkFactory<StoreFileWriter>() { 146 @Override 147 public StoreFileWriter createWriter(InternalScanner scanner, 148 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 149 boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) 150 throws IOException { 151 // make this writer with tags always because of possible new cells with tags. 152 return store.getStoreEngine() 153 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 154 .includeMVCCReadpoint(true).includesTag(true)); 155 } 156 }; 157 158 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 159 super(conf, store); 160 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 161 // During the compaction, the compactor reads the cells from the mob files and 162 // probably creates new mob files. All of these operations are included in HMobStore, 163 // so we need to cast the Store to HMobStore. 164 if (!(store instanceof HMobStore)) { 165 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 166 } 167 this.mobStore = (HMobStore) store; 168 this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 169 this.ioOptimizedMode = 170 conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE) 171 .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 172 173 } 174 175 @Override 176 public List<Path> compact(CompactionRequestImpl request, 177 ThroughputController throughputController, User user) throws IOException { 178 String tableName = store.getTableName().toString(); 179 String regionName = store.getRegionInfo().getRegionNameAsString(); 180 String familyName = store.getColumnFamilyName(); 181 LOG.info( 182 "MOB compaction: major={} isAll={} priority={} throughput controller={}" 183 + " table={} cf={} region={}", 184 request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController, 185 tableName, familyName, regionName); 186 if (request.getPriority() == HStore.PRIORITY_USER) { 187 userRequest.set(Boolean.TRUE); 188 } else { 189 userRequest.set(Boolean.FALSE); 190 } 191 LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName, 192 regionName, request.getFiles()); 193 // Check if I/O optimized MOB compaction 194 if (ioOptimizedMode) { 195 if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) { 196 try { 197 final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> { 198 byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS); 199 ImmutableSetMultimap.Builder<TableName, String> builder; 200 if (value == null) { 201 builder = ImmutableSetMultimap.builder(); 202 } else { 203 try { 204 builder = MobUtils.deserializeMobFileRefs(value); 205 } catch (RuntimeException exception) { 206 throw new RuntimeException("failure getting mob references for hfile " + file, 207 exception); 208 } 209 } 210 return builder; 211 }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build(); 212 // reset disableIO 213 disableIO.set(Boolean.FALSE); 214 if (!mobRefs.isEmpty()) { 215 calculateMobLengthMap(mobRefs); 216 } 217 LOG.info( 218 "Table={} cf={} region={}. I/O optimized MOB compaction. " 219 + "Total referenced MOB files: {}", 220 tableName, familyName, regionName, mobRefs.size()); 221 } catch (RuntimeException exception) { 222 throw new IOException("Failed to get list of referenced hfiles for request " + request, 223 exception); 224 } 225 } 226 } 227 228 return compact(request, scannerFactory, writerFactory, throughputController, user); 229 } 230 231 /** 232 * @param mobRefs multimap of original table name -> mob hfile 233 */ 234 private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException { 235 FileSystem fs = store.getFileSystem(); 236 HashMap<String, Long> map = mobLengthMap.get(); 237 map.clear(); 238 for (Entry<TableName, String> reference : mobRefs.entries()) { 239 final TableName table = reference.getKey(); 240 final String mobfile = reference.getValue(); 241 if (MobFileName.isOldMobFileName(mobfile)) { 242 disableIO.set(Boolean.TRUE); 243 } 244 List<Path> locations = mobStore.getLocations(table); 245 for (Path p : locations) { 246 try { 247 FileStatus st = fs.getFileStatus(new Path(p, mobfile)); 248 long size = st.getLen(); 249 LOG.debug("Referenced MOB file={} size={}", mobfile, size); 250 map.put(mobfile, size); 251 break; 252 } catch (FileNotFoundException exception) { 253 LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile, 254 p); 255 } 256 } 257 if (!map.containsKey(mobfile)) { 258 throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " 259 + "expected locations: " + locations); 260 } 261 } 262 } 263 264 /** 265 * Performs compaction on a column family with the mob flag enabled. This works only when MOB 266 * compaction is explicitly requested (by User), or by Master There are two modes of a MOB 267 * compaction:<br> 268 * <p> 269 * <ul> 270 * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file. 271 * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br> 272 * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file 273 * produced during compaction and to limit I/O write/read amplification. 274 * </ul> 275 * The basic algorithm of compaction is the following: <br> 276 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 277 * <ol> 278 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 279 * directly copy the (with mob tag) cell into the new store file.</li> 280 * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the 281 * new store file.</li> 282 * </ol> 283 * 2. If the Put cell doesn't have a reference tag. 284 * <ol> 285 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 286 * write this cell to a mob file, and write the path of this mob file to the store file.</li> 287 * <li>Otherwise, directly write this cell into the store file.</li> 288 * </ol> 289 * @param fd File details 290 * @param scanner Where to read from. 291 * @param writer Where to write to. 292 * @param smallestReadPoint Smallest read point. 293 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 294 * smallestReadPoint 295 * @param throughputController The compaction throughput controller. 296 * @param request compaction request. 297 * @param progress Progress reporter. 298 * @return Whether compaction ended; false if it was interrupted for any reason. 299 */ 300 @Override 301 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 302 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 303 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 304 long bytesWrittenProgressForLog = 0; 305 long bytesWrittenProgressForShippedCall = 0; 306 // Clear old mob references 307 mobRefSet.get().clear(); 308 boolean isUserRequest = userRequest.get(); 309 boolean major = request.isAllFiles(); 310 boolean compactMOBs = major && isUserRequest; 311 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 312 MobConstants.DEFAULT_MOB_DISCARD_MISS); 313 if (discardMobMiss) { 314 LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version" 315 + " with the distributed mob compaction feature on a cluster that has experienced MOB data " 316 + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY); 317 } 318 long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 319 MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE); 320 boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get(); 321 LOG.info( 322 "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}" 323 + " major={} store={}", 324 compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo()); 325 // Since scanner.next() can return 'false' but still be delivering data, 326 // we have to use a do/while loop. 327 List<ExtendedCell> cells = new ArrayList<>(); 328 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 329 long currentTime = EnvironmentEdgeManager.currentTime(); 330 long lastMillis = 0; 331 if (LOG.isDebugEnabled()) { 332 lastMillis = currentTime; 333 } 334 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 335 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 336 long now = 0; 337 boolean hasMore; 338 byte[] fileName = null; 339 StoreFileWriter mobFileWriter = null; 340 /* 341 * mobCells are used only to decide if we need to commit or abort current MOB output file. 342 */ 343 long mobCells = 0; 344 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 345 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 346 boolean finished = false; 347 348 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 349 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 350 compactScannerSizeLimit) 351 .build(); 352 throughputController.start(compactionName); 353 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 354 long shippedCallSizeLimit = 355 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 356 357 ExtendedCell mobCell = null; 358 List<String> committedMobWriterFileNames = new ArrayList<>(); 359 try { 360 361 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 362 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 363 364 do { 365 hasMore = scanner.next(cells, scannerContext); 366 currentTime = EnvironmentEdgeManager.currentTime(); 367 if (LOG.isDebugEnabled()) { 368 now = currentTime; 369 } 370 if (closeChecker.isTimeLimit(store, currentTime)) { 371 progress.cancel(); 372 return false; 373 } 374 for (ExtendedCell c : cells) { 375 if (compactMOBs) { 376 if (MobUtils.isMobReferenceCell(c)) { 377 String fName = MobUtils.getMobFileName(c); 378 // Added to support migration 379 try { 380 mobCell = mobStore.resolve(c, true, false).getCell(); 381 } catch (DoNotRetryIOException e) { 382 if ( 383 discardMobMiss && e.getCause() != null 384 && e.getCause() instanceof FileNotFoundException 385 ) { 386 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 387 continue; 388 } else { 389 throw e; 390 } 391 } 392 393 if (discardMobMiss && mobCell.getValueLength() == 0) { 394 LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c); 395 continue; 396 } else if (mobCell.getValueLength() == 0) { 397 String errMsg = 398 String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s", 399 fName, mobCell, c); 400 throw new IOException(errMsg); 401 } 402 403 if (mobCell.getValueLength() > mobSizeThreshold) { 404 // put the mob data back to the MOB store file 405 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 406 if (!ioOptimizedMode) { 407 mobFileWriter.append(mobCell); 408 mobCells++; 409 writer.append( 410 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 411 } else { 412 // I/O optimized mode 413 // Check if MOB cell origin file size is 414 // greater than threshold 415 Long size = mobLengthMap.get().get(fName); 416 if (size == null) { 417 // FATAL error (we should never get here though), abort compaction 418 // This error means that meta section of store file does not contain 419 // MOB file, which has references in at least one cell from this store file 420 String msg = String.format( 421 "Found an unexpected MOB file during compaction %s, aborting compaction %s", 422 fName, getStoreInfo()); 423 throw new IOException(msg); 424 } 425 // Can not be null 426 if (size < maxMobFileSize) { 427 // If MOB cell origin file is below threshold 428 // it is get compacted 429 mobFileWriter.append(mobCell); 430 // Update number of mobCells in a current mob writer 431 mobCells++; 432 writer.append( 433 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 434 // Update total size of the output (we do not take into account 435 // file compression yet) 436 long len = mobFileWriter.getPos(); 437 if (len > maxMobFileSize) { 438 LOG.debug("Closing output MOB File, length={} file={}, store={}", len, 439 mobFileWriter.getPath().getName(), getStoreInfo()); 440 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 441 request, committedMobWriterFileNames); 442 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 443 mobCells = 0; 444 } 445 } else { 446 // We leave large MOB file as is (is not compacted), 447 // then we update set of MOB file references 448 // and append mob cell directly to the store's writer 449 Optional<TableName> refTable = MobUtils.getTableName(c); 450 if (refTable.isPresent()) { 451 mobRefSet.get().put(refTable.get(), fName); 452 writer.append(c); 453 } else { 454 throw new IOException(String.format("MOB cell did not contain a tablename " 455 + "tag. should not be possible. see ref guide on mob troubleshooting. " 456 + "store=%s cell=%s", getStoreInfo(), c)); 457 } 458 } 459 } 460 } else { 461 // If MOB value is less than threshold, append it directly to a store file 462 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 463 writer.append(mobCell); 464 cellsCountCompactedFromMob++; 465 cellsSizeCompactedFromMob += mobCell.getValueLength(); 466 } 467 } else { 468 // Not a MOB reference cell 469 int size = c.getValueLength(); 470 if (size > mobSizeThreshold) { 471 // This MOB cell comes from a regular store file 472 // therefore we store it into original mob output 473 mobFileWriter.append(c); 474 writer 475 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 476 mobCells++; 477 cellsCountCompactedToMob++; 478 cellsSizeCompactedToMob += c.getValueLength(); 479 if (ioOptimizedMode) { 480 // Update total size of the output (we do not take into account 481 // file compression yet) 482 long len = mobFileWriter.getPos(); 483 if (len > maxMobFileSize) { 484 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 485 request, committedMobWriterFileNames); 486 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 487 mobCells = 0; 488 } 489 } 490 } else { 491 // Not a MOB cell, write it directly to a store file 492 writer.append(c); 493 } 494 } 495 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 496 // Not a major compaction or major with MOB disabled 497 // If the kv type is not put, directly write the cell 498 // to the store file. 499 writer.append(c); 500 } else if (MobUtils.isMobReferenceCell(c)) { 501 // Not a major MOB compaction, Put MOB reference 502 if (MobUtils.hasValidMobRefCellValue(c)) { 503 // We do not check mobSizeThreshold during normal compaction, 504 // leaving it to a MOB compaction run 505 Optional<TableName> refTable = MobUtils.getTableName(c); 506 if (refTable.isPresent()) { 507 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 508 writer.append(c); 509 } else { 510 throw new IOException(String.format("MOB cell did not contain a tablename " 511 + "tag. should not be possible. see ref guide on mob troubleshooting. " 512 + "store=%s cell=%s", getStoreInfo(), c)); 513 } 514 } else { 515 String errMsg = String.format("Corrupted MOB reference: %s", c.toString()); 516 throw new IOException(errMsg); 517 } 518 } else if (c.getValueLength() <= mobSizeThreshold) { 519 // If the value size of a cell is not larger than the threshold, directly write it to 520 // the store file. 521 writer.append(c); 522 } else { 523 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 524 // write this cell to a mob file, and write the path to the store file. 525 mobCells++; 526 // append the original keyValue in the mob file. 527 mobFileWriter.append(c); 528 ExtendedCell reference = 529 MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 530 // write the cell whose value is the path of a mob file to the store file. 531 writer.append(reference); 532 cellsCountCompactedToMob++; 533 cellsSizeCompactedToMob += c.getValueLength(); 534 if (ioOptimizedMode) { 535 long len = mobFileWriter.getPos(); 536 if (len > maxMobFileSize) { 537 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request, 538 committedMobWriterFileNames); 539 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 540 mobCells = 0; 541 } 542 } 543 } 544 545 int len = c.getSerializedSize(); 546 ++progress.currentCompactedKVs; 547 progress.totalCompactedSize += len; 548 bytesWrittenProgressForShippedCall += len; 549 if (LOG.isDebugEnabled()) { 550 bytesWrittenProgressForLog += len; 551 } 552 throughputController.control(compactionName, len); 553 if (closeChecker.isSizeLimit(store, len)) { 554 progress.cancel(); 555 return false; 556 } 557 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 558 ((ShipperListener) writer).beforeShipped(); 559 kvs.shipped(); 560 bytesWrittenProgressForShippedCall = 0; 561 } 562 } 563 // Log the progress of long running compactions every minute if 564 // logging at DEBUG level 565 if (LOG.isDebugEnabled()) { 566 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 567 String rate = String.format("%.2f", 568 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 569 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 570 compactionName, progress, rate, throughputController); 571 lastMillis = now; 572 bytesWrittenProgressForLog = 0; 573 } 574 } 575 cells.clear(); 576 } while (hasMore); 577 // Commit last MOB writer 578 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 579 finished = true; 580 } catch (InterruptedException e) { 581 progress.cancel(); 582 throw new InterruptedIOException( 583 "Interrupted while control throughput of compacting " + compactionName); 584 } catch (IOException t) { 585 String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName(); 586 throw new IOException(msg, t); 587 } finally { 588 // Clone last cell in the final because writer will append last cell when committing. If 589 // don't clone here and once the scanner get closed, then the memory of last cell will be 590 // released. (HBASE-22582) 591 ((ShipperListener) writer).beforeShipped(); 592 throughputController.finish(compactionName); 593 if (!finished && mobFileWriter != null) { 594 // Remove all MOB references because compaction failed 595 clearThreadLocals(); 596 // Abort writer 597 LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", 598 mobFileWriter.getPath(), getStoreInfo()); 599 abortWriter(mobFileWriter); 600 deleteCommittedMobFiles(committedMobWriterFileNames); 601 } 602 } 603 604 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 605 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 606 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 607 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 608 progress.complete(); 609 return true; 610 } 611 612 protected String getStoreInfo() { 613 return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), 614 store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); 615 } 616 617 private void clearThreadLocals() { 618 mobRefSet.get().clear(); 619 HashMap<String, Long> map = mobLengthMap.get(); 620 if (map != null) { 621 map.clear(); 622 } 623 } 624 625 private StoreFileWriter newMobWriter(FileDetails fd, boolean major, 626 Consumer<Path> writerCreationTracker) throws IOException { 627 try { 628 StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() 629 ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 630 major ? majorCompactionCompression : minorCompactionCompression, 631 store.getRegionInfo().getStartKey(), true) 632 : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, 633 major ? majorCompactionCompression : minorCompactionCompression, 634 store.getRegionInfo().getStartKey(), true, writerCreationTracker); 635 LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), 636 getStoreInfo()); 637 // Add reference we get for compact MOB 638 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 639 return mobFileWriter; 640 } catch (IOException e) { 641 // Bailing out 642 throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()), 643 e); 644 } 645 } 646 647 private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells, 648 boolean major) throws IOException { 649 // Commit or abort major mob writer 650 // If IOException happens during below operation, some 651 // MOB files can be committed partially, but corresponding 652 // store file won't be committed, therefore these MOB files 653 // become orphans and will be deleted during next MOB cleaning chore cycle 654 655 if (mobFileWriter != null) { 656 LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}", 657 mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo()); 658 Path path = 659 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 660 if (mobCells > 0) { 661 // If the mob file is not empty, commit it. 662 mobFileWriter.appendMetadata(maxSeqId, major, mobCells); 663 mobFileWriter.close(); 664 mobStore.commitFile(mobFileWriter.getPath(), path); 665 } else { 666 // If the mob file is empty, delete it instead of committing. 667 LOG.debug("Aborting writer for {} because there are no MOB cells, store={}", 668 mobFileWriter.getPath(), getStoreInfo()); 669 // Remove MOB file from reference set 670 mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName()); 671 abortWriter(mobFileWriter); 672 } 673 } else { 674 LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo()); 675 } 676 } 677 678 @Override 679 protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, 680 CompactionRequestImpl request) throws IOException { 681 List<Path> newFiles = Lists.newArrayList(writer.getPath()); 682 writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); 683 writer.appendMobMetadata(mobRefSet.get()); 684 writer.close(); 685 clearThreadLocals(); 686 return newFiles; 687 } 688 689 private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd, 690 long mobCells, boolean major, CompactionRequestImpl request, 691 List<String> committedMobWriterFileNames) throws IOException { 692 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 693 committedMobWriterFileNames.add(mobFileWriter.getPath().getName()); 694 return newMobWriter(fd, major, request.getWriterCreationTracker()); 695 } 696 697 private void deleteCommittedMobFiles(List<String> fileNames) { 698 if (fileNames.isEmpty()) { 699 return; 700 } 701 Path mobColumnFamilyPath = 702 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 703 for (String fileName : fileNames) { 704 if (fileName == null) { 705 continue; 706 } 707 Path path = new Path(mobColumnFamilyPath, fileName); 708 try { 709 if (store.getFileSystem().exists(path)) { 710 store.getFileSystem().delete(path, false); 711 } 712 } catch (IOException e) { 713 LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e); 714 } 715 } 716 717 } 718 719}