001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
039import org.apache.hadoop.hbase.regionserver.CellSink;
040import org.apache.hadoop.hbase.regionserver.HStore;
041import org.apache.hadoop.hbase.regionserver.InternalScanner;
042import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
043import org.apache.hadoop.hbase.regionserver.ScannerContext;
044import org.apache.hadoop.hbase.regionserver.ShipperListener;
045import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
046import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
048import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
050import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * This class is used for testing only. The main purpose is to emulate random failures during MOB
059 * compaction process. Example of usage:
060 *
061 * <pre>
062 * {
063 *   &#64;code
064 *   public class SomeTest {
065 *
066 *     public void initConfiguration(Configuration conf) {
067 *       conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
068 *         FaultyMobStoreCompactor.class.getName());
069 *       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
070 *     }
071 *   }
072 * }
073 * </pre>
074 *
075 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class.
076 */
077@InterfaceAudience.Private
078public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
079
080  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
081
082  public static AtomicLong mobCounter = new AtomicLong();
083  public static AtomicLong totalFailures = new AtomicLong();
084  public static AtomicLong totalCompactions = new AtomicLong();
085  public static AtomicLong totalMajorCompactions = new AtomicLong();
086
087  static double failureProb = 0.1d;
088
089  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
090    super(conf, store);
091    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
092  }
093
094  @Override
095  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
096    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
097    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
098
099    boolean major = request.isAllFiles();
100    totalCompactions.incrementAndGet();
101    if (major) {
102      totalMajorCompactions.incrementAndGet();
103    }
104    long bytesWrittenProgressForLog = 0;
105    long bytesWrittenProgressForShippedCall = 0;
106    // Clear old mob references
107    mobRefSet.get().clear();
108    boolean isUserRequest = userRequest.get();
109    boolean compactMOBs = major && isUserRequest;
110    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
111      MobConstants.DEFAULT_MOB_DISCARD_MISS);
112
113    boolean mustFail = false;
114    if (compactMOBs) {
115      mobCounter.incrementAndGet();
116      double dv = ThreadLocalRandom.current().nextDouble();
117      if (dv < failureProb) {
118        mustFail = true;
119        totalFailures.incrementAndGet();
120      }
121    }
122
123    FileSystem fs = store.getFileSystem();
124
125    // Since scanner.next() can return 'false' but still be delivering data,
126    // we have to use a do/while loop.
127    List<Cell> cells = new ArrayList<>();
128    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
129    long currentTime = EnvironmentEdgeManager.currentTime();
130    long lastMillis = 0;
131    if (LOG.isDebugEnabled()) {
132      lastMillis = currentTime;
133    }
134    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
135    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
136    long now = 0;
137    boolean hasMore;
138    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
139    byte[] fileName = null;
140    StoreFileWriter mobFileWriter = null;
141    long mobCells = 0;
142    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
143    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
144    boolean finished = false;
145
146    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
147      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
148        compactScannerSizeLimit)
149      .build();
150    throughputController.start(compactionName);
151    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
152    long shippedCallSizeLimit =
153      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
154
155    ExtendedCell mobCell = null;
156
157    long counter = 0;
158    long countFailAt = -1;
159    if (mustFail) {
160      countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast
161    }
162
163    try {
164      try {
165        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
166          major ? majorCompactionCompression : minorCompactionCompression,
167          store.getRegionInfo().getStartKey(), true);
168        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
169      } catch (IOException e) {
170        // Bailing out
171        LOG.error("Failed to create mob writer, ", e);
172        throw e;
173      }
174      if (compactMOBs) {
175        // Add the only reference we get for compact MOB case
176        // because new store file will have only one MOB reference
177        // in this case - of newly compacted MOB file
178        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
179      }
180      do {
181        hasMore = scanner.next(cells, scannerContext);
182        currentTime = EnvironmentEdgeManager.currentTime();
183        if (LOG.isDebugEnabled()) {
184          now = currentTime;
185        }
186        if (closeChecker.isTimeLimit(store, currentTime)) {
187          progress.cancel();
188          return false;
189        }
190        for (Cell cell : cells) {
191          ExtendedCell c = (ExtendedCell) cell;
192          counter++;
193          if (compactMOBs) {
194            if (MobUtils.isMobReferenceCell(c)) {
195              if (counter == countFailAt) {
196                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
197                throw new CorruptHFileException("injected fault");
198              }
199              String fName = MobUtils.getMobFileName(c);
200              // Added to support migration
201              try {
202                mobCell = mobStore.resolve(c, true, false).getCell();
203              } catch (DoNotRetryIOException e) {
204                if (
205                  discardMobMiss && e.getCause() != null
206                    && e.getCause() instanceof FileNotFoundException
207                ) {
208                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
209                  continue;
210                } else {
211                  throw e;
212                }
213              }
214
215              if (discardMobMiss && mobCell.getValueLength() == 0) {
216                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
217                continue;
218              }
219
220              if (mobCell.getValueLength() > mobSizeThreshold) {
221                // put the mob data back to the store file
222                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
223                mobFileWriter.append(mobCell);
224                writer.append(
225                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
226                mobCells++;
227              } else {
228                // If MOB value is less than threshold, append it directly to a store file
229                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
230                writer.append(mobCell);
231                cellsCountCompactedFromMob++;
232                cellsSizeCompactedFromMob += mobCell.getValueLength();
233              }
234            } else {
235              // Not a MOB reference cell
236              int size = c.getValueLength();
237              if (size > mobSizeThreshold) {
238                mobFileWriter.append(c);
239                writer
240                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
241                mobCells++;
242                cellsCountCompactedToMob++;
243                cellsSizeCompactedToMob += c.getValueLength();
244              } else {
245                writer.append(c);
246              }
247            }
248          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
249            // Not a major compaction or major with MOB disabled
250            // If the kv type is not put, directly write the cell
251            // to the store file.
252            writer.append(c);
253          } else if (MobUtils.isMobReferenceCell(c)) {
254            // Not a major MOB compaction, Put MOB reference
255            if (MobUtils.hasValidMobRefCellValue(c)) {
256              int size = MobUtils.getMobValueLength(c);
257              if (size > mobSizeThreshold) {
258                // If the value size is larger than the threshold, it's regarded as a mob. Since
259                // its value is already in the mob file, directly write this cell to the store file
260                Optional<TableName> refTable = MobUtils.getTableName(c);
261                if (refTable.isPresent()) {
262                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
263                  writer.append(c);
264                } else {
265                  throw new IOException(String.format("MOB cell did not contain a tablename "
266                    + "tag. should not be possible. see ref guide on mob troubleshooting. "
267                    + "store=%s cell=%s", getStoreInfo(), c));
268                }
269              } else {
270                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
271                // the mob cell from the mob file, and write it back to the store file.
272                mobCell = mobStore.resolve(c, true, false).getCell();
273                if (mobCell.getValueLength() != 0) {
274                  // put the mob data back to the store file
275                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
276                  writer.append(mobCell);
277                  cellsCountCompactedFromMob++;
278                  cellsSizeCompactedFromMob += mobCell.getValueLength();
279                } else {
280                  // If the value of a file is empty, there might be issues when retrieving,
281                  // directly write the cell to the store file, and leave it to be handled by the
282                  // next compaction.
283                  LOG.error("Empty value for: " + c);
284                  Optional<TableName> refTable = MobUtils.getTableName(c);
285                  if (refTable.isPresent()) {
286                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
287                    writer.append(c);
288                  } else {
289                    throw new IOException(String.format("MOB cell did not contain a tablename "
290                      + "tag. should not be possible. see ref guide on mob troubleshooting. "
291                      + "store=%s cell=%s", getStoreInfo(), c));
292                  }
293                }
294              }
295            } else {
296              LOG.error("Corrupted MOB reference: {}", c);
297              writer.append(c);
298            }
299          } else if (c.getValueLength() <= mobSizeThreshold) {
300            // If the value size of a cell is not larger than the threshold, directly write it to
301            // the store file.
302            writer.append(c);
303          } else {
304            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
305            // write this cell to a mob file, and write the path to the store file.
306            mobCells++;
307            // append the original keyValue in the mob file.
308            mobFileWriter.append(c);
309            ExtendedCell reference =
310              MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
311            // write the cell whose value is the path of a mob file to the store file.
312            writer.append(reference);
313            cellsCountCompactedToMob++;
314            cellsSizeCompactedToMob += c.getValueLength();
315            // Add ref we get for compact MOB case
316            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
317          }
318
319          int len = c.getSerializedSize();
320          ++progress.currentCompactedKVs;
321          progress.totalCompactedSize += len;
322          bytesWrittenProgressForShippedCall += len;
323          if (LOG.isDebugEnabled()) {
324            bytesWrittenProgressForLog += len;
325          }
326          throughputController.control(compactionName, len);
327          if (closeChecker.isSizeLimit(store, len)) {
328            progress.cancel();
329            return false;
330          }
331          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
332            ((ShipperListener) writer).beforeShipped();
333            kvs.shipped();
334            bytesWrittenProgressForShippedCall = 0;
335          }
336        }
337        // Log the progress of long running compactions every minute if
338        // logging at DEBUG level
339        if (LOG.isDebugEnabled()) {
340          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
341            String rate = String.format("%.2f",
342              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
343            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
344              compactionName, progress, rate, throughputController);
345            lastMillis = now;
346            bytesWrittenProgressForLog = 0;
347          }
348        }
349        cells.clear();
350      } while (hasMore);
351      finished = true;
352    } catch (InterruptedException e) {
353      progress.cancel();
354      throw new InterruptedIOException(
355        "Interrupted while control throughput of compacting " + compactionName);
356    } catch (FileNotFoundException e) {
357      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
358      System.exit(-1);
359    } catch (IOException t) {
360      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
361      throw t;
362    } finally {
363      // Clone last cell in the final because writer will append last cell when committing. If
364      // don't clone here and once the scanner get closed, then the memory of last cell will be
365      // released. (HBASE-22582)
366      ((ShipperListener) writer).beforeShipped();
367      throughputController.finish(compactionName);
368      if (!finished && mobFileWriter != null) {
369        // Remove all MOB references because compaction failed
370        mobRefSet.get().clear();
371        // Abort writer
372        abortWriter(mobFileWriter);
373      }
374    }
375
376    if (mobFileWriter != null) {
377      if (mobCells > 0) {
378        // If the mob file is not empty, commit it.
379        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
380        mobFileWriter.close();
381        mobStore.commitFile(mobFileWriter.getPath(), path);
382      } else {
383        // If the mob file is empty, delete it instead of committing.
384        abortWriter(mobFileWriter);
385      }
386    }
387    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
388    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
389    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
390    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
391    progress.complete();
392    return true;
393
394  }
395
396}