001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
021import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
022
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map.Entry;
031import java.util.Optional;
032import java.util.function.Consumer;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.regionserver.CellSink;
043import org.apache.hadoop.hbase.regionserver.HMobStore;
044import org.apache.hadoop.hbase.regionserver.HStore;
045import org.apache.hadoop.hbase.regionserver.HStoreFile;
046import org.apache.hadoop.hbase.regionserver.InternalScanner;
047import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.ScannerContext;
051import org.apache.hadoop.hbase.regionserver.ShipperListener;
052import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
053import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
054import org.apache.hadoop.hbase.regionserver.StoreScanner;
055import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
058import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
059import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
060import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
061import org.apache.hadoop.hbase.security.User;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
072
073/**
074 * Compact passed set of files in the mob-enabled column family.
075 */
076@InterfaceAudience.Private
077public class DefaultMobStoreCompactor extends DefaultCompactor {
078
079  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class);
080  protected long mobSizeThreshold;
081  protected HMobStore mobStore;
082  protected boolean ioOptimizedMode = false;
083
084  /*
085   * MOB file reference set thread local variable. It contains set of a MOB file names, which newly
086   * compacted store file has references to. This variable is populated during compaction and the
087   * content of it is written into meta section of a newly created store file at the final step of
088   * compaction process.
089   */
090  static ThreadLocal<SetMultimap<TableName, String>> mobRefSet =
091    ThreadLocal.withInitial(HashMultimap::create);
092
093  /*
094   * Is it user or system-originated request.
095   */
096
097  static ThreadLocal<Boolean> userRequest = new ThreadLocal<Boolean>() {
098    @Override
099    protected Boolean initialValue() {
100      return Boolean.FALSE;
101    }
102  };
103
104  /*
105   * Disable IO mode. IO mode can be forcefully disabled if compactor finds old MOB file
106   * (pre-distributed compaction). This means that migration has not been completed yet. During data
107   * migration (upgrade) process only general compaction is allowed.
108   */
109
110  static ThreadLocal<Boolean> disableIO = new ThreadLocal<Boolean>() {
111
112    @Override
113    protected Boolean initialValue() {
114      return Boolean.FALSE;
115    }
116  };
117
118  /*
119   * Map : MOB file name - file length Can be expensive for large amount of MOB files.
120   */
121  static ThreadLocal<HashMap<String, Long>> mobLengthMap =
122    new ThreadLocal<HashMap<String, Long>>() {
123      @Override
124      protected HashMap<String, Long> initialValue() {
125        return new HashMap<String, Long>();
126      }
127    };
128
129  private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
130
131    @Override
132    public ScanType getScanType(CompactionRequestImpl request) {
133      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
134    }
135
136    @Override
137    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
138      ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
139      return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
140        fd.earliestPutTs);
141    }
142  };
143
144  private final CellSinkFactory<StoreFileWriter> writerFactory =
145    new CellSinkFactory<StoreFileWriter>() {
146      @Override
147      public StoreFileWriter createWriter(InternalScanner scanner,
148        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
149        boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
150        throws IOException {
151        // make this writer with tags always because of possible new cells with tags.
152        return store.getStoreEngine()
153          .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)
154            .includeMVCCReadpoint(true).includesTag(true));
155      }
156    };
157
158  public DefaultMobStoreCompactor(Configuration conf, HStore store) {
159    super(conf, store);
160    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
161    // During the compaction, the compactor reads the cells from the mob files and
162    // probably creates new mob files. All of these operations are included in HMobStore,
163    // so we need to cast the Store to HMobStore.
164    if (!(store instanceof HMobStore)) {
165      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
166    }
167    this.mobStore = (HMobStore) store;
168    this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
169    this.ioOptimizedMode =
170      conf.get(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.DEFAULT_MOB_COMPACTION_TYPE)
171        .equals(MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
172
173  }
174
175  @Override
176  public List<Path> compact(CompactionRequestImpl request,
177    ThroughputController throughputController, User user) throws IOException {
178    String tableName = store.getTableName().toString();
179    String regionName = store.getRegionInfo().getRegionNameAsString();
180    String familyName = store.getColumnFamilyName();
181    LOG.info(
182      "MOB compaction: major={} isAll={} priority={} throughput controller={}"
183        + " table={} cf={} region={}",
184      request.isMajor(), request.isAllFiles(), request.getPriority(), throughputController,
185      tableName, familyName, regionName);
186    if (request.getPriority() == HStore.PRIORITY_USER) {
187      userRequest.set(Boolean.TRUE);
188    } else {
189      userRequest.set(Boolean.FALSE);
190    }
191    LOG.debug("MOB compaction table={} cf={} region={} files: {}", tableName, familyName,
192      regionName, request.getFiles());
193    // Check if I/O optimized MOB compaction
194    if (ioOptimizedMode) {
195      if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
196        try {
197          final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> {
198            byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
199            ImmutableSetMultimap.Builder<TableName, String> builder;
200            if (value == null) {
201              builder = ImmutableSetMultimap.builder();
202            } else {
203              try {
204                builder = MobUtils.deserializeMobFileRefs(value);
205              } catch (RuntimeException exception) {
206                throw new RuntimeException("failure getting mob references for hfile " + file,
207                  exception);
208              }
209            }
210            return builder;
211          }).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build();
212          // reset disableIO
213          disableIO.set(Boolean.FALSE);
214          if (!mobRefs.isEmpty()) {
215            calculateMobLengthMap(mobRefs);
216          }
217          LOG.info(
218            "Table={} cf={} region={}. I/O optimized MOB compaction. "
219              + "Total referenced MOB files: {}",
220            tableName, familyName, regionName, mobRefs.size());
221        } catch (RuntimeException exception) {
222          throw new IOException("Failed to get list of referenced hfiles for request " + request,
223            exception);
224        }
225      }
226    }
227
228    return compact(request, scannerFactory, writerFactory, throughputController, user);
229  }
230
231  /**
232   * @param mobRefs multimap of original table name -> mob hfile
233   */
234  private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
235    FileSystem fs = store.getFileSystem();
236    HashMap<String, Long> map = mobLengthMap.get();
237    map.clear();
238    for (Entry<TableName, String> reference : mobRefs.entries()) {
239      final TableName table = reference.getKey();
240      final String mobfile = reference.getValue();
241      if (MobFileName.isOldMobFileName(mobfile)) {
242        disableIO.set(Boolean.TRUE);
243      }
244      List<Path> locations = mobStore.getLocations(table);
245      for (Path p : locations) {
246        try {
247          FileStatus st = fs.getFileStatus(new Path(p, mobfile));
248          long size = st.getLen();
249          LOG.debug("Referenced MOB file={} size={}", mobfile, size);
250          map.put(mobfile, size);
251          break;
252        } catch (FileNotFoundException exception) {
253          LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
254            p);
255        }
256      }
257      if (!map.containsKey(mobfile)) {
258        throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of "
259          + "expected locations: " + locations);
260      }
261    }
262  }
263
264  /**
265   * Performs compaction on a column family with the mob flag enabled. This works only when MOB
266   * compaction is explicitly requested (by User), or by Master There are two modes of a MOB
267   * compaction:<br>
268   * <p>
269   * <ul>
270   * <li>1. Full mode - when all MOB data for a region is compacted into a single MOB file.
271   * <li>2. I/O optimized mode - for use cases with no or infrequent updates/deletes of a <br>
272   * MOB data. The main idea behind i/o optimized compaction is to limit maximum size of a MOB file
273   * produced during compaction and to limit I/O write/read amplification.
274   * </ul>
275   * The basic algorithm of compaction is the following: <br>
276   * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
277   * <ol>
278   * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
279   * directly copy the (with mob tag) cell into the new store file.</li>
280   * <li>Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into the
281   * new store file.</li>
282   * </ol>
283   * 2. If the Put cell doesn't have a reference tag.
284   * <ol>
285   * <li>If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
286   * write this cell to a mob file, and write the path of this mob file to the store file.</li>
287   * <li>Otherwise, directly write this cell into the store file.</li>
288   * </ol>
289   * @param fd                   File details
290   * @param scanner              Where to read from.
291   * @param writer               Where to write to.
292   * @param smallestReadPoint    Smallest read point.
293   * @param cleanSeqId           When true, remove seqId(used to be mvcc) value which is <=
294   *                             smallestReadPoint
295   * @param throughputController The compaction throughput controller.
296   * @param request              compaction request.
297   * @param progress             Progress reporter.
298   * @return Whether compaction ended; false if it was interrupted for any reason.
299   */
300  @Override
301  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
302    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
303    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
304    long bytesWrittenProgressForLog = 0;
305    long bytesWrittenProgressForShippedCall = 0;
306    // Clear old mob references
307    mobRefSet.get().clear();
308    boolean isUserRequest = userRequest.get();
309    boolean major = request.isAllFiles();
310    boolean compactMOBs = major && isUserRequest;
311    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
312      MobConstants.DEFAULT_MOB_DISCARD_MISS);
313    if (discardMobMiss) {
314      LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version"
315        + " with the distributed mob compaction feature on a cluster that has experienced MOB data "
316        + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY);
317    }
318    long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY,
319      MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE);
320    boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get();
321    LOG.info(
322      "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}"
323        + " major={} store={}",
324      compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo());
325    // Since scanner.next() can return 'false' but still be delivering data,
326    // we have to use a do/while loop.
327    List<ExtendedCell> cells = new ArrayList<>();
328    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
329    long currentTime = EnvironmentEdgeManager.currentTime();
330    long lastMillis = 0;
331    if (LOG.isDebugEnabled()) {
332      lastMillis = currentTime;
333    }
334    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
335    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
336    long now = 0;
337    boolean hasMore;
338    byte[] fileName = null;
339    StoreFileWriter mobFileWriter = null;
340    /*
341     * mobCells are used only to decide if we need to commit or abort current MOB output file.
342     */
343    long mobCells = 0;
344    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
345    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
346    boolean finished = false;
347
348    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
349      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
350        compactScannerSizeLimit)
351      .build();
352    throughputController.start(compactionName);
353    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
354    long shippedCallSizeLimit =
355      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
356
357    ExtendedCell mobCell = null;
358    List<String> committedMobWriterFileNames = new ArrayList<>();
359    try {
360
361      mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
362      fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
363
364      do {
365        hasMore = scanner.next(cells, scannerContext);
366        currentTime = EnvironmentEdgeManager.currentTime();
367        if (LOG.isDebugEnabled()) {
368          now = currentTime;
369        }
370        if (closeChecker.isTimeLimit(store, currentTime)) {
371          progress.cancel();
372          return false;
373        }
374        for (ExtendedCell c : cells) {
375          if (compactMOBs) {
376            if (MobUtils.isMobReferenceCell(c)) {
377              String fName = MobUtils.getMobFileName(c);
378              // Added to support migration
379              try {
380                mobCell = mobStore.resolve(c, true, false).getCell();
381              } catch (DoNotRetryIOException e) {
382                if (
383                  discardMobMiss && e.getCause() != null
384                    && e.getCause() instanceof FileNotFoundException
385                ) {
386                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
387                  continue;
388                } else {
389                  throw e;
390                }
391              }
392
393              if (discardMobMiss && mobCell.getValueLength() == 0) {
394                LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c);
395                continue;
396              } else if (mobCell.getValueLength() == 0) {
397                String errMsg =
398                  String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s",
399                    fName, mobCell, c);
400                throw new IOException(errMsg);
401              }
402
403              if (mobCell.getValueLength() > mobSizeThreshold) {
404                // put the mob data back to the MOB store file
405                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
406                if (!ioOptimizedMode) {
407                  mobFileWriter.append(mobCell);
408                  mobCells++;
409                  writer.append(
410                    MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
411                } else {
412                  // I/O optimized mode
413                  // Check if MOB cell origin file size is
414                  // greater than threshold
415                  Long size = mobLengthMap.get().get(fName);
416                  if (size == null) {
417                    // FATAL error (we should never get here though), abort compaction
418                    // This error means that meta section of store file does not contain
419                    // MOB file, which has references in at least one cell from this store file
420                    String msg = String.format(
421                      "Found an unexpected MOB file during compaction %s, aborting compaction %s",
422                      fName, getStoreInfo());
423                    throw new IOException(msg);
424                  }
425                  // Can not be null
426                  if (size < maxMobFileSize) {
427                    // If MOB cell origin file is below threshold
428                    // it is get compacted
429                    mobFileWriter.append(mobCell);
430                    // Update number of mobCells in a current mob writer
431                    mobCells++;
432                    writer.append(
433                      MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
434                    // Update total size of the output (we do not take into account
435                    // file compression yet)
436                    long len = mobFileWriter.getPos();
437                    if (len > maxMobFileSize) {
438                      LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
439                        mobFileWriter.getPath().getName(), getStoreInfo());
440                      mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
441                        request, committedMobWriterFileNames);
442                      fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
443                      mobCells = 0;
444                    }
445                  } else {
446                    // We leave large MOB file as is (is not compacted),
447                    // then we update set of MOB file references
448                    // and append mob cell directly to the store's writer
449                    Optional<TableName> refTable = MobUtils.getTableName(c);
450                    if (refTable.isPresent()) {
451                      mobRefSet.get().put(refTable.get(), fName);
452                      writer.append(c);
453                    } else {
454                      throw new IOException(String.format("MOB cell did not contain a tablename "
455                        + "tag. should not be possible. see ref guide on mob troubleshooting. "
456                        + "store=%s cell=%s", getStoreInfo(), c));
457                    }
458                  }
459                }
460              } else {
461                // If MOB value is less than threshold, append it directly to a store file
462                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
463                writer.append(mobCell);
464                cellsCountCompactedFromMob++;
465                cellsSizeCompactedFromMob += mobCell.getValueLength();
466              }
467            } else {
468              // Not a MOB reference cell
469              int size = c.getValueLength();
470              if (size > mobSizeThreshold) {
471                // This MOB cell comes from a regular store file
472                // therefore we store it into original mob output
473                mobFileWriter.append(c);
474                writer
475                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
476                mobCells++;
477                cellsCountCompactedToMob++;
478                cellsSizeCompactedToMob += c.getValueLength();
479                if (ioOptimizedMode) {
480                  // Update total size of the output (we do not take into account
481                  // file compression yet)
482                  long len = mobFileWriter.getPos();
483                  if (len > maxMobFileSize) {
484                    mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
485                      request, committedMobWriterFileNames);
486                    fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
487                    mobCells = 0;
488                  }
489                }
490              } else {
491                // Not a MOB cell, write it directly to a store file
492                writer.append(c);
493              }
494            }
495          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
496            // Not a major compaction or major with MOB disabled
497            // If the kv type is not put, directly write the cell
498            // to the store file.
499            writer.append(c);
500          } else if (MobUtils.isMobReferenceCell(c)) {
501            // Not a major MOB compaction, Put MOB reference
502            if (MobUtils.hasValidMobRefCellValue(c)) {
503              // We do not check mobSizeThreshold during normal compaction,
504              // leaving it to a MOB compaction run
505              Optional<TableName> refTable = MobUtils.getTableName(c);
506              if (refTable.isPresent()) {
507                mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
508                writer.append(c);
509              } else {
510                throw new IOException(String.format("MOB cell did not contain a tablename "
511                  + "tag. should not be possible. see ref guide on mob troubleshooting. "
512                  + "store=%s cell=%s", getStoreInfo(), c));
513              }
514            } else {
515              String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
516              throw new IOException(errMsg);
517            }
518          } else if (c.getValueLength() <= mobSizeThreshold) {
519            // If the value size of a cell is not larger than the threshold, directly write it to
520            // the store file.
521            writer.append(c);
522          } else {
523            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
524            // write this cell to a mob file, and write the path to the store file.
525            mobCells++;
526            // append the original keyValue in the mob file.
527            mobFileWriter.append(c);
528            ExtendedCell reference =
529              MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
530            // write the cell whose value is the path of a mob file to the store file.
531            writer.append(reference);
532            cellsCountCompactedToMob++;
533            cellsSizeCompactedToMob += c.getValueLength();
534            if (ioOptimizedMode) {
535              long len = mobFileWriter.getPos();
536              if (len > maxMobFileSize) {
537                mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request,
538                  committedMobWriterFileNames);
539                fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
540                mobCells = 0;
541              }
542            }
543          }
544
545          int len = c.getSerializedSize();
546          ++progress.currentCompactedKVs;
547          progress.totalCompactedSize += len;
548          bytesWrittenProgressForShippedCall += len;
549          if (LOG.isDebugEnabled()) {
550            bytesWrittenProgressForLog += len;
551          }
552          throughputController.control(compactionName, len);
553          if (closeChecker.isSizeLimit(store, len)) {
554            progress.cancel();
555            return false;
556          }
557          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
558            ((ShipperListener) writer).beforeShipped();
559            kvs.shipped();
560            bytesWrittenProgressForShippedCall = 0;
561          }
562        }
563        // Log the progress of long running compactions every minute if
564        // logging at DEBUG level
565        if (LOG.isDebugEnabled()) {
566          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
567            String rate = String.format("%.2f",
568              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
569            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
570              compactionName, progress, rate, throughputController);
571            lastMillis = now;
572            bytesWrittenProgressForLog = 0;
573          }
574        }
575        cells.clear();
576      } while (hasMore);
577      // Commit last MOB writer
578      commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
579      finished = true;
580    } catch (InterruptedException e) {
581      progress.cancel();
582      throw new InterruptedIOException(
583        "Interrupted while control throughput of compacting " + compactionName);
584    } catch (IOException t) {
585      String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName();
586      throw new IOException(msg, t);
587    } finally {
588      // Clone last cell in the final because writer will append last cell when committing. If
589      // don't clone here and once the scanner get closed, then the memory of last cell will be
590      // released. (HBASE-22582)
591      ((ShipperListener) writer).beforeShipped();
592      throughputController.finish(compactionName);
593      if (!finished && mobFileWriter != null) {
594        // Remove all MOB references because compaction failed
595        clearThreadLocals();
596        // Abort writer
597        LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
598          mobFileWriter.getPath(), getStoreInfo());
599        abortWriter(mobFileWriter);
600        deleteCommittedMobFiles(committedMobWriterFileNames);
601      }
602    }
603
604    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
605    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
606    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
607    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
608    progress.complete();
609    return true;
610  }
611
612  protected String getStoreInfo() {
613    return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
614      store.getColumnFamilyName(), store.getRegionInfo().getEncodedName());
615  }
616
617  private void clearThreadLocals() {
618    mobRefSet.get().clear();
619    HashMap<String, Long> map = mobLengthMap.get();
620    if (map != null) {
621      map.clear();
622    }
623  }
624
625  private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
626    Consumer<Path> writerCreationTracker) throws IOException {
627    try {
628      StoreFileWriter mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
629        ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
630          major ? majorCompactionCompression : minorCompactionCompression,
631          store.getRegionInfo().getStartKey(), true)
632        : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
633          major ? majorCompactionCompression : minorCompactionCompression,
634          store.getRegionInfo().getStartKey(), true, writerCreationTracker);
635      LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
636        getStoreInfo());
637      // Add reference we get for compact MOB
638      mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
639      return mobFileWriter;
640    } catch (IOException e) {
641      // Bailing out
642      throw new IOException(String.format("Failed to create mob writer, store=%s", getStoreInfo()),
643        e);
644    }
645  }
646
647  private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId, long mobCells,
648    boolean major) throws IOException {
649    // Commit or abort major mob writer
650    // If IOException happens during below operation, some
651    // MOB files can be committed partially, but corresponding
652    // store file won't be committed, therefore these MOB files
653    // become orphans and will be deleted during next MOB cleaning chore cycle
654
655    if (mobFileWriter != null) {
656      LOG.debug("Commit or abort size={} mobCells={} major={} file={}, store={}",
657        mobFileWriter.getPos(), mobCells, major, mobFileWriter.getPath().getName(), getStoreInfo());
658      Path path =
659        MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
660      if (mobCells > 0) {
661        // If the mob file is not empty, commit it.
662        mobFileWriter.appendMetadata(maxSeqId, major, mobCells);
663        mobFileWriter.close();
664        mobStore.commitFile(mobFileWriter.getPath(), path);
665      } else {
666        // If the mob file is empty, delete it instead of committing.
667        LOG.debug("Aborting writer for {} because there are no MOB cells, store={}",
668          mobFileWriter.getPath(), getStoreInfo());
669        // Remove MOB file from reference set
670        mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName());
671        abortWriter(mobFileWriter);
672      }
673    } else {
674      LOG.debug("Mob file writer is null, skipping commit/abort, store=", getStoreInfo());
675    }
676  }
677
678  @Override
679  protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
680    CompactionRequestImpl request) throws IOException {
681    List<Path> newFiles = Lists.newArrayList(writer.getPath());
682    writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
683    writer.appendMobMetadata(mobRefSet.get());
684    writer.close();
685    clearThreadLocals();
686    return newFiles;
687  }
688
689  private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd,
690    long mobCells, boolean major, CompactionRequestImpl request,
691    List<String> committedMobWriterFileNames) throws IOException {
692    commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
693    committedMobWriterFileNames.add(mobFileWriter.getPath().getName());
694    return newMobWriter(fd, major, request.getWriterCreationTracker());
695  }
696
697  private void deleteCommittedMobFiles(List<String> fileNames) {
698    if (fileNames.isEmpty()) {
699      return;
700    }
701    Path mobColumnFamilyPath =
702      MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
703    for (String fileName : fileNames) {
704      if (fileName == null) {
705        continue;
706      }
707      Path path = new Path(mobColumnFamilyPath, fileName);
708      try {
709        if (store.getFileSystem().exists(path)) {
710          store.getFileSystem().delete(path, false);
711        }
712      } catch (IOException e) {
713        LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e);
714      }
715    }
716
717  }
718
719}