001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; 021import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; 022 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.function.Consumer; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.regionserver.CellSink; 042import org.apache.hadoop.hbase.regionserver.HMobStore; 043import org.apache.hadoop.hbase.regionserver.HStore; 044import org.apache.hadoop.hbase.regionserver.HStoreFile; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 047import org.apache.hadoop.hbase.regionserver.ScanInfo; 048import org.apache.hadoop.hbase.regionserver.ScanType; 049import org.apache.hadoop.hbase.regionserver.ScannerContext; 050import org.apache.hadoop.hbase.regionserver.ShipperListener; 051import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 052import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 053import org.apache.hadoop.hbase.regionserver.StoreScanner; 054import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 055import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 057import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 060import org.apache.hadoop.hbase.security.User; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; 069import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 070import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 071 072/** 073 * Compact passed set of files in the mob-enabled column family. 074 */ 075@InterfaceAudience.Private 076public class DefaultMobStoreCompactor extends DefaultCompactor { 077 078 private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); 079 protected long mobSizeThreshold; 080 protected HMobStore mobStore; 081 protected boolean ioOptimizedMode = false; 082 083 /* 084 * MOB file reference set thread local variable. It contains set of a MOB file names, which newly 085 * compacted store file has references to. This variable is populated during compaction and the 086 * content of it is written into meta section of a newly created store file at the final step of 087 * compaction process. 088 */ 089 090 static ThreadLocal<SetMultimap<TableName, String>> mobRefSet = 091 ThreadLocal.withInitial(HashMultimap::create); 092 093 /* 094 * Is it user or system-originated request. 095 */ 096 097 static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() { 098 @Override 099 protected Boolean initialValue() { 100 return Boolean.FALSE; 101 } 102 }; 103 104 /* 105 * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file 106 * (pre-distributed compaction). This means that migration has not been completed yet. During data 107 * migration (upgrade) process only general compaction is allowed. 108 */ 109 110 static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() { 111 112 @Override 113 protected Boolean initialValue() { 114 return Boolean.FALSE; 115 } 116 }; 117 118 /* 119 * Map : MOB file name - file length Can be expensive for large amount of MOB files. 120 */ 121 static ThreadLocal<HashMap<String, Long>> mobLengthMap = 122 new ThreadLocal<HashMap<String, Long>>() { 123 @Override 124 protected HashMap<String, Long> initialValue() { 125 return new HashMap<String, Long>(); 126 } 127 }; 128 129 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { 130 131 @Override 132 public ScanType getScanType(CompactionRequestImpl request) { 133 return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; 134 } 135 136 @Override 137 public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, 138 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { 139 return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, 140 fd.earliestPutTs); 141 } 142 }; 143 144 private final CellSinkFactory<StoreFileWriter> writerFactory = 145 new CellSinkFactory<StoreFileWriter>() { 146 @Override 147 public StoreFileWriter createWriter(InternalScanner scanner, 148 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, 149 boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) 150 throws IOException { 151 // make this writer with tags always because of possible new cells with tags. 152 return store.getStoreEngine() 153 .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker) 154 .includeMVCCReadpoint(true).includesTag(true)); 155 } 156 }; 157 158 public DefaultMobStoreCompactor(Configuration conf, HStore store) { 159 super(conf, store); 160 // The mob cells reside in the mob-enabled column family which is held by HMobStore. 161 // During the compaction, the compactor reads the cells from the mob files and 162 // probably creates new mob files. All of these operations are included in HMobStore, 163 // so we need to cast the Store to HMobStore. 164 if (!(store instanceof HMobStore)) { 165 throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); 166 } 167 this.mobStore = (HMobStore) store; 168 this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); 169 this.ioOptimizedMode = 170 conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE) 171 .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 172 173 } 174 175 @Override 176 public List<Path> compact(CompactionRequestImpl request, 177 ThroughputController throughputController, User user) throws IOException { 178 String tableName = store.getTableName().toString(); 179 String regionName = store.getRegionInfo().getRegionNameAsString(); 180 String familyName = store.getColumnFamilyName(); 181 LOG.info( 182 "MOB compaction: major={} isAll={} priority={} throughput controller={}" 183 + " table={} cf={} region={}", 184 request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController, 185 tableName, familyName, regionName); 186 if (request.getPriority() == HStore.PRIORITY_USER) { 187 userRequest.set(Boolean.TRUE); 188 } else { 189 userRequest.set(Boolean.FALSE); 190 } 191 LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName, 192 regionName, request.getFiles()); 193 // Check if I/O optimized MOB compaction 194 if (ioOptimizedMode) { 195 if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) { 196 try { 197 final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> { 198 byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS); 199 ImmutableSetMultimap.Builder<TableName, String> builder; 200 if (value == null) { 201 builder = ImmutableSetMultimap.builder(); 202 } else { 203 try { 204 builder = MobUtils.deserializeMobFileRefs(value); 205 } catch (RuntimeException exception) { 206 throw new RuntimeException("failure getting mob references for hfile " + file, 207 exception); 208 } 209 } 210 return builder; 211 }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build(); 212 // reset disableIO 213 disableIO.set(Boolean.FALSE); 214 if (!mobRefs.isEmpty()) { 215 calculateMobLengthMap(mobRefs); 216 } 217 LOG.info( 218 "Table={} cf={} region={}. I/O optimized MOB compaction. " 219 + "Total referenced MOB files: {}", 220 tableName, familyName, regionName, mobRefs.size()); 221 } catch (RuntimeException exception) { 222 throw new IOException("Failed to get list of referenced hfiles for request " + request, 223 exception); 224 } 225 } 226 } 227 228 return compact(request, scannerFactory, writerFactory, throughputController, user); 229 } 230 231 /** 232 * @param mobRefs multimap of original table name -> mob hfile 233 */ 234 private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException { 235 FileSystem fs = store.getFileSystem(); 236 HashMap<String, Long> map = mobLengthMap.get(); 237 map.clear(); 238 for (Map.Entry<TableName, String> reference : mobRefs.entries()) { 239 final TableName table = reference.getKey(); 240 final String mobfile = reference.getValue(); 241 if (MobFileName.isOldMobFileName(mobfile)) { 242 disableIO.set(Boolean.TRUE); 243 } 244 List<Path> locations = mobStore.getLocations(table); 245 for (Path p : locations) { 246 try { 247 FileStatus st = fs.getFileStatus(new Path(p, mobfile)); 248 long size = st.getLen(); 249 LOG.debug("Referenced MOB file={} size={}", mobfile, size); 250 map.put(mobfile, size); 251 break; 252 } catch (FileNotFoundException exception) { 253 LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile, 254 p); 255 } 256 } 257 if (!map.containsKey(mobfile)) { 258 throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " 259 + "expected locations: " + locations); 260 } 261 262 } 263 } 264 265 /** 266 * Performs compaction on a column family with the mob flag enabled. This works only when MOB 267 * compaction is explicitly requested (by User), or by Master There are two modes of a MOB 268 * compaction:<br> 269 * <p> 270 * <ul> 271 * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file. 272 * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br> 273 * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file 274 * produced during compaction and to limit I/O write/read amplification. 275 * </ul> 276 * The basic algorithm of compaction is the following: <br> 277 * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. 278 * <ol> 279 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 280 * directly copy the (with mob tag) cell into the new store file.</li> 281 * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the 282 * new store file.</li> 283 * </ol> 284 * 2. If the Put cell doesn't have a reference tag. 285 * <ol> 286 * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob, 287 * write this cell to a mob file, and write the path of this mob file to the store file.</li> 288 * <li>Otherwise, directly write this cell into the store file.</li> 289 * </ol> 290 * @param fd File details 291 * @param scanner Where to read from. 292 * @param writer Where to write to. 293 * @param smallestReadPoint Smallest read point. 294 * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= 295 * smallestReadPoint 296 * @param throughputController The compaction throughput controller. 297 * @param request compaction request. 298 * @param progress Progress reporter. 299 * @return Whether compaction ended; false if it was interrupted for any reason. 300 */ 301 @Override 302 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 303 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 304 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 305 long bytesWrittenProgressForLog = 0; 306 long bytesWrittenProgressForShippedCall = 0; 307 // Clear old mob references 308 mobRefSet.get().clear(); 309 boolean isUserRequest = userRequest.get(); 310 boolean major = request.isAllFiles(); 311 boolean compactMOBs = major && isUserRequest; 312 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 313 MobConstants.DEFAULT_MOB_DISCARD_MISS); 314 if (discardMobMiss) { 315 LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version" 316 + " with the distributed mob compaction feature on a cluster that has experienced MOB data " 317 + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY); 318 } 319 long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 320 MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE); 321 boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get(); 322 LOG.info( 323 "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}" 324 + " major={} store={}", 325 compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo()); 326 // Since scanner.next() can return 'false' but still be delivering data, 327 // we have to use a do/while loop. 328 List<Cell> cells = new ArrayList<>(); 329 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 330 long currentTime = EnvironmentEdgeManager.currentTime(); 331 long lastMillis = 0; 332 if (LOG.isDebugEnabled()) { 333 lastMillis = currentTime; 334 } 335 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 336 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 337 long now = 0; 338 boolean hasMore; 339 byte[] fileName = null; 340 StoreFileWriter mobFileWriter = null; 341 /* 342 * mobCells are used only to decide if we need to commit or abort current MOB output file. 343 */ 344 long mobCells = 0; 345 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 346 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 347 boolean finished = false; 348 349 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 350 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 351 compactScannerSizeLimit) 352 .build(); 353 throughputController.start(compactionName); 354 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 355 long shippedCallSizeLimit = 356 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 357 358 Cell mobCell = null; 359 List<String> committedMobWriterFileNames = new ArrayList<>(); 360 try { 361 362 mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); 363 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 364 365 do { 366 hasMore = scanner.next(cells, scannerContext); 367 now = EnvironmentEdgeManager.currentTime(); 368 for (Cell c : cells) { 369 if (compactMOBs) { 370 if (MobUtils.isMobReferenceCell(c)) { 371 String fName = MobUtils.getMobFileName(c); 372 // Added to support migration 373 try { 374 mobCell = mobStore.resolve(c, true, false).getCell(); 375 } catch (FileNotFoundException fnfe) { 376 if (discardMobMiss) { 377 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 378 continue; 379 } else { 380 throw fnfe; 381 } 382 } 383 384 if (discardMobMiss && mobCell.getValueLength() == 0) { 385 LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c); 386 continue; 387 } else if (mobCell.getValueLength() == 0) { 388 String errMsg = 389 String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s", 390 fName, mobCell, c); 391 throw new IOException(errMsg); 392 } 393 394 if (mobCell.getValueLength() > mobSizeThreshold) { 395 // put the mob data back to the MOB store file 396 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 397 if (!ioOptimizedMode) { 398 mobFileWriter.append(mobCell); 399 mobCells++; 400 writer.append( 401 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 402 } else { 403 // I/O optimized mode 404 // Check if MOB cell origin file size is 405 // greater than threshold 406 Long size = mobLengthMap.get().get(fName); 407 if (size == null) { 408 // FATAL error (we should never get here though), abort compaction 409 // This error means that meta section of store file does not contain 410 // MOB file, which has references in at least one cell from this store file 411 String msg = String.format( 412 "Found an unexpected MOB file during compaction %s, aborting compaction %s", 413 fName, getStoreInfo()); 414 throw new IOException(msg); 415 } 416 // Can not be null 417 if (size < maxMobFileSize) { 418 // If MOB cell origin file is below threshold 419 // it is get compacted 420 mobFileWriter.append(mobCell); 421 // Update number of mobCells in a current mob writer 422 mobCells++; 423 writer.append( 424 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 425 // Update total size of the output (we do not take into account 426 // file compression yet) 427 long len = mobFileWriter.getPos(); 428 if (len > maxMobFileSize) { 429 LOG.debug("Closing output MOB File, length={} file={}, store={}", len, 430 mobFileWriter.getPath().getName(), getStoreInfo()); 431 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 432 request, committedMobWriterFileNames); 433 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 434 mobCells = 0; 435 } 436 } else { 437 // We leave large MOB file as is (is not compacted), 438 // then we update set of MOB file references 439 // and append mob cell directly to the store's writer 440 Optional<TableName> refTable = MobUtils.getTableName(c); 441 if (refTable.isPresent()) { 442 mobRefSet.get().put(refTable.get(), fName); 443 writer.append(c); 444 } else { 445 throw new IOException(String.format("MOB cell did not contain a tablename " 446 + "tag. should not be possible. see ref guide on mob troubleshooting. " 447 + "store=%s cell=%s", getStoreInfo(), c)); 448 } 449 } 450 } 451 } else { 452 // If MOB value is less than threshold, append it directly to a store file 453 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 454 writer.append(mobCell); 455 cellsCountCompactedFromMob++; 456 cellsSizeCompactedFromMob += mobCell.getValueLength(); 457 } 458 } else { 459 // Not a MOB reference cell 460 int size = c.getValueLength(); 461 if (size > mobSizeThreshold) { 462 // This MOB cell comes from a regular store file 463 // therefore we store it into original mob output 464 mobFileWriter.append(c); 465 writer 466 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 467 mobCells++; 468 cellsCountCompactedToMob++; 469 cellsSizeCompactedToMob += c.getValueLength(); 470 if (ioOptimizedMode) { 471 // Update total size of the output (we do not take into account 472 // file compression yet) 473 long len = mobFileWriter.getPos(); 474 if (len > maxMobFileSize) { 475 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, 476 request, committedMobWriterFileNames); 477 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 478 mobCells = 0; 479 } 480 } 481 } else { 482 // Not a MOB cell, write it directly to a store file 483 writer.append(c); 484 } 485 } 486 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 487 // Not a major compaction or major with MOB disabled 488 // If the kv type is not put, directly write the cell 489 // to the store file. 490 writer.append(c); 491 } else if (MobUtils.isMobReferenceCell(c)) { 492 // Not a major MOB compaction, Put MOB reference 493 if (MobUtils.hasValidMobRefCellValue(c)) { 494 // We do not check mobSizeThreshold during normal compaction, 495 // leaving it to a MOB compaction run 496 Optional<TableName> refTable = MobUtils.getTableName(c); 497 if (refTable.isPresent()) { 498 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 499 writer.append(c); 500 } else { 501 throw new IOException(String.format("MOB cell did not contain a tablename " 502 + "tag. should not be possible. see ref guide on mob troubleshooting. " 503 + "store=%s cell=%s", getStoreInfo(), c)); 504 } 505 } else { 506 String errMsg = String.format("Corrupted MOB reference: %s", c.toString()); 507 throw new IOException(errMsg); 508 } 509 } else if (c.getValueLength() <= mobSizeThreshold) { 510 // If the value size of a cell is not larger than the threshold, directly write it to 511 // the store file. 512 writer.append(c); 513 } else { 514 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 515 // write this cell to a mob file, and write the path to the store file. 516 mobCells++; 517 // append the original keyValue in the mob file. 518 mobFileWriter.append(c); 519 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 520 // write the cell whose value is the path of a mob file to the store file. 521 writer.append(reference); 522 cellsCountCompactedToMob++; 523 cellsSizeCompactedToMob += c.getValueLength(); 524 if (ioOptimizedMode) { 525 long len = mobFileWriter.getPos(); 526 if (len > maxMobFileSize) { 527 mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request, 528 committedMobWriterFileNames); 529 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 530 mobCells = 0; 531 } 532 } 533 } 534 535 int len = c.getSerializedSize(); 536 ++progress.currentCompactedKVs; 537 progress.totalCompactedSize += len; 538 bytesWrittenProgressForShippedCall += len; 539 if (LOG.isDebugEnabled()) { 540 bytesWrittenProgressForLog += len; 541 } 542 throughputController.control(compactionName, len); 543 if (closeChecker.isSizeLimit(store, len)) { 544 progress.cancel(); 545 return false; 546 } 547 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 548 ((ShipperListener) writer).beforeShipped(); 549 kvs.shipped(); 550 bytesWrittenProgressForShippedCall = 0; 551 } 552 } 553 // Log the progress of long running compactions every minute if 554 // logging at DEBUG level 555 if (LOG.isDebugEnabled()) { 556 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 557 String rate = String.format("%.2f", 558 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 559 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 560 compactionName, progress, rate, throughputController); 561 lastMillis = now; 562 bytesWrittenProgressForLog = 0; 563 } 564 } 565 cells.clear(); 566 } while (hasMore); 567 // Commit last MOB writer 568 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 569 finished = true; 570 } catch (InterruptedException e) { 571 progress.cancel(); 572 throw new InterruptedIOException( 573 "Interrupted while control throughput of compacting " + compactionName); 574 } catch (IOException t) { 575 String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName(); 576 throw new IOException(msg, t); 577 } finally { 578 // Clone last cell in the final because writer will append last cell when committing. If 579 // don't clone here and once the scanner get closed, then the memory of last cell will be 580 // released. (HBASE-22582) 581 ((ShipperListener) writer).beforeShipped(); 582 throughputController.finish(compactionName); 583 if (!finished && mobFileWriter != null) { 584 // Remove all MOB references because compaction failed 585 clearThreadLocals(); 586 // Abort writer 587 LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", 588 mobFileWriter.getPath(), getStoreInfo()); 589 abortWriter(mobFileWriter); 590 deleteCommittedMobFiles(committedMobWriterFileNames); 591 } 592 } 593 594 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 595 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 596 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 597 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 598 progress.complete(); 599 return true; 600 } 601 602 protected String getStoreInfo() { 603 return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(), 604 store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()); 605 } 606 607 private void clearThreadLocals() { 608 mobRefSet.get().clear(); 609 HashMap<String, Long> map = mobLengthMap.get(); 610 if (map != null) { 611 map.clear(); 612 } 613 } 614 615 private StoreFileWriter newMobWriter(FileDetails fd, boolean major, 616 Consumer<Path> writerCreationTracker) throws IOException { 617 try { 618 StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst() 619 ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 620 major ? majorCompactionCompression : minorCompactionCompression, 621 store.getRegionInfo().getStartKey(), true) 622 : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount, 623 major ? majorCompactionCompression : minorCompactionCompression, 624 store.getRegionInfo().getStartKey(), true, writerCreationTracker); 625 LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(), 626 getStoreInfo()); 627 // Add reference we get for compact MOB 628 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 629 return mobFileWriter; 630 } catch (IOException e) { 631 // Bailing out 632 throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()), 633 e); 634 } 635 } 636 637 private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells, 638 boolean major) throws IOException { 639 // Commit or abort major mob writer 640 // If IOException happens during below operation, some 641 // MOB files can be committed partially, but corresponding 642 // store file won't be committed, therefore these MOB files 643 // become orphans and will be deleted during next MOB cleaning chore cycle 644 645 if (mobFileWriter != null) { 646 LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}", 647 mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo()); 648 Path path = 649 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 650 if (mobCells > 0) { 651 // If the mob file is not empty, commit it. 652 mobFileWriter.appendMetadata(maxSeqId, major, mobCells); 653 mobFileWriter.close(); 654 mobStore.commitFile(mobFileWriter.getPath(), path); 655 } else { 656 // If the mob file is empty, delete it instead of committing. 657 LOG.debug("Aborting writer for {} because there are no MOB cells, store={}", 658 mobFileWriter.getPath(), getStoreInfo()); 659 // Remove MOB file from reference set 660 mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName()); 661 abortWriter(mobFileWriter); 662 } 663 } else { 664 LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo()); 665 } 666 } 667 668 @Override 669 protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, 670 CompactionRequestImpl request) throws IOException { 671 List<Path> newFiles = Lists.newArrayList(writer.getPath()); 672 writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); 673 writer.appendMobMetadata(mobRefSet.get()); 674 writer.close(); 675 clearThreadLocals(); 676 return newFiles; 677 } 678 679 private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd, 680 long mobCells, boolean major, CompactionRequestImpl request, 681 List<String> committedMobWriterFileNames) throws IOException { 682 commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); 683 committedMobWriterFileNames.add(mobFileWriter.getPath().getName()); 684 return newMobWriter(fd, major, request.getWriterCreationTracker()); 685 } 686 687 private void deleteCommittedMobFiles(List<String> fileNames) { 688 if (fileNames.isEmpty()) { 689 return; 690 } 691 Path mobColumnFamilyPath = 692 MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 693 for (String fileName : fileNames) { 694 if (fileName == null) { 695 continue; 696 } 697 Path path = new Path(mobColumnFamilyPath, fileName); 698 try { 699 if (store.getFileSystem().exists(path)) { 700 store.getFileSystem().delete(path, false); 701 } 702 } catch (IOException e) { 703 LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e); 704 } 705 } 706 707 } 708 709}