001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Optional;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
036import org.apache.hadoop.hbase.regionserver.CellSink;
037import org.apache.hadoop.hbase.regionserver.HStore;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
040import org.apache.hadoop.hbase.regionserver.ScannerContext;
041import org.apache.hadoop.hbase.regionserver.ShipperListener;
042import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
043import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * This class is used for testing only. The main purpose is to emulate random failures during MOB
056 * compaction process. Example of usage:
057 *
058 * <pre>
059 * {
060 *   &#64;code
061 *   public class SomeTest {
062 *
063 *     public void initConfiguration(Configuration conf) {
064 *       conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
065 *         FaultyMobStoreCompactor.class.getName());
066 *       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
067 *     }
068 *   }
069 * }
070 * </pre>
071 *
072 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class.
073 */
074@InterfaceAudience.Private
075public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
076
077  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
078
079  public static AtomicLong mobCounter = new AtomicLong();
080  public static AtomicLong totalFailures = new AtomicLong();
081  public static AtomicLong totalCompactions = new AtomicLong();
082  public static AtomicLong totalMajorCompactions = new AtomicLong();
083
084  static double failureProb = 0.1d;
085  static Random rnd = new Random();
086
087  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
088    super(conf, store);
089    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
090  }
091
092  @Override
093  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
094    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
095    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
096
097    boolean major = request.isAllFiles();
098    totalCompactions.incrementAndGet();
099    if (major) {
100      totalMajorCompactions.incrementAndGet();
101    }
102    long bytesWrittenProgressForCloseCheck = 0;
103    long bytesWrittenProgressForLog = 0;
104    long bytesWrittenProgressForShippedCall = 0;
105    // Clear old mob references
106    mobRefSet.get().clear();
107    boolean isUserRequest = userRequest.get();
108    boolean compactMOBs = major && isUserRequest;
109    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
110      MobConstants.DEFAULT_MOB_DISCARD_MISS);
111
112    boolean mustFail = false;
113    if (compactMOBs) {
114      mobCounter.incrementAndGet();
115      double dv = rnd.nextDouble();
116      if (dv < failureProb) {
117        mustFail = true;
118        totalFailures.incrementAndGet();
119      }
120    }
121
122    // Since scanner.next() can return 'false' but still be delivering data,
123    // we have to use a do/while loop.
124    List<Cell> cells = new ArrayList<>();
125    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
126    int closeCheckSizeLimit =
127      conf.getInt(CloseChecker.SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
128    long lastMillis = 0;
129    if (LOG.isDebugEnabled()) {
130      lastMillis = EnvironmentEdgeManager.currentTime();
131    }
132    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
133    long now = 0;
134    boolean hasMore;
135    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
136    byte[] fileName = null;
137    StoreFileWriter mobFileWriter = null;
138    long mobCells = 0;
139    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
140    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
141    boolean finished = false;
142
143    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
144      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
145        compactScannerSizeLimit)
146      .build();
147    throughputController.start(compactionName);
148    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
149    long shippedCallSizeLimit =
150      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
151
152    Cell mobCell = null;
153
154    long counter = 0;
155    long countFailAt = -1;
156    if (mustFail) {
157      countFailAt = rnd.nextInt(100); // randomly fail fast
158    }
159
160    try {
161      try {
162        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
163          major ? majorCompactionCompression : minorCompactionCompression,
164          store.getRegionInfo().getStartKey(), true);
165        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
166      } catch (IOException e) {
167        // Bailing out
168        LOG.error("Failed to create mob writer, ", e);
169        throw e;
170      }
171      if (compactMOBs) {
172        // Add the only reference we get for compact MOB case
173        // because new store file will have only one MOB reference
174        // in this case - of newly compacted MOB file
175        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
176      }
177      do {
178        hasMore = scanner.next(cells, scannerContext);
179        if (LOG.isDebugEnabled()) {
180          now = EnvironmentEdgeManager.currentTime();
181        }
182        for (Cell c : cells) {
183          counter++;
184          if (compactMOBs) {
185            if (MobUtils.isMobReferenceCell(c)) {
186              if (counter == countFailAt) {
187                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
188                throw new CorruptHFileException("injected fault");
189              }
190              String fName = MobUtils.getMobFileName(c);
191              // Added to support migration
192              try {
193                mobCell = mobStore.resolve(c, true, false).getCell();
194              } catch (FileNotFoundException fnfe) {
195                if (discardMobMiss) {
196                  LOG.error("Missing MOB cell: file={} not found", fName);
197                  continue;
198                } else {
199                  throw fnfe;
200                }
201              }
202
203              if (discardMobMiss && mobCell.getValueLength() == 0) {
204                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
205                continue;
206              }
207
208              if (mobCell.getValueLength() > mobSizeThreshold) {
209                // put the mob data back to the store file
210                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
211                mobFileWriter.append(mobCell);
212                writer.append(
213                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
214                mobCells++;
215              } else {
216                // If MOB value is less than threshold, append it directly to a store file
217                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
218                writer.append(mobCell);
219                cellsCountCompactedFromMob++;
220                cellsSizeCompactedFromMob += mobCell.getValueLength();
221              }
222            } else {
223              // Not a MOB reference cell
224              int size = c.getValueLength();
225              if (size > mobSizeThreshold) {
226                mobFileWriter.append(c);
227                writer
228                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
229                mobCells++;
230                cellsCountCompactedToMob++;
231                cellsSizeCompactedToMob += c.getValueLength();
232              } else {
233                writer.append(c);
234              }
235            }
236          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
237            // Not a major compaction or major with MOB disabled
238            // If the kv type is not put, directly write the cell
239            // to the store file.
240            writer.append(c);
241          } else if (MobUtils.isMobReferenceCell(c)) {
242            // Not a major MOB compaction, Put MOB reference
243            if (MobUtils.hasValidMobRefCellValue(c)) {
244              int size = MobUtils.getMobValueLength(c);
245              if (size > mobSizeThreshold) {
246                // If the value size is larger than the threshold, it's regarded as a mob. Since
247                // its value is already in the mob file, directly write this cell to the store file
248                Optional<TableName> refTable = MobUtils.getTableName(c);
249                if (refTable.isPresent()) {
250                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
251                  writer.append(c);
252                } else {
253                  throw new IOException(String.format("MOB cell did not contain a tablename "
254                    + "tag. should not be possible. see ref guide on mob troubleshooting. "
255                    + "store=%s cell=%s", getStoreInfo(), c));
256                }
257              } else {
258                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
259                // the mob cell from the mob file, and write it back to the store file.
260                mobCell = mobStore.resolve(c, true, false).getCell();
261                if (mobCell.getValueLength() != 0) {
262                  // put the mob data back to the store file
263                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
264                  writer.append(mobCell);
265                  cellsCountCompactedFromMob++;
266                  cellsSizeCompactedFromMob += mobCell.getValueLength();
267                } else {
268                  // If the value of a file is empty, there might be issues when retrieving,
269                  // directly write the cell to the store file, and leave it to be handled by the
270                  // next compaction.
271                  LOG.error("Empty value for: " + c);
272                  Optional<TableName> refTable = MobUtils.getTableName(c);
273                  if (refTable.isPresent()) {
274                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
275                    writer.append(c);
276                  } else {
277                    throw new IOException(String.format("MOB cell did not contain a tablename "
278                      + "tag. should not be possible. see ref guide on mob troubleshooting. "
279                      + "store=%s cell=%s", getStoreInfo(), c));
280                  }
281                }
282              }
283            } else {
284              LOG.error("Corrupted MOB reference: {}", c);
285              writer.append(c);
286            }
287          } else if (c.getValueLength() <= mobSizeThreshold) {
288            // If the value size of a cell is not larger than the threshold, directly write it to
289            // the store file.
290            writer.append(c);
291          } else {
292            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
293            // write this cell to a mob file, and write the path to the store file.
294            mobCells++;
295            // append the original keyValue in the mob file.
296            mobFileWriter.append(c);
297            Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
298            // write the cell whose value is the path of a mob file to the store file.
299            writer.append(reference);
300            cellsCountCompactedToMob++;
301            cellsSizeCompactedToMob += c.getValueLength();
302            // Add ref we get for compact MOB case
303            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
304          }
305
306          int len = c.getSerializedSize();
307          ++progress.currentCompactedKVs;
308          progress.totalCompactedSize += len;
309          bytesWrittenProgressForShippedCall += len;
310          if (LOG.isDebugEnabled()) {
311            bytesWrittenProgressForLog += len;
312          }
313          throughputController.control(compactionName, len);
314          // check periodically to see if a system stop is requested
315          if (closeCheckSizeLimit > 0) {
316            bytesWrittenProgressForCloseCheck += len;
317            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
318              bytesWrittenProgressForCloseCheck = 0;
319              if (!store.areWritesEnabled()) {
320                progress.cancel();
321                return false;
322              }
323            }
324          }
325          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
326            ((ShipperListener) writer).beforeShipped();
327            kvs.shipped();
328            bytesWrittenProgressForShippedCall = 0;
329          }
330        }
331        // Log the progress of long running compactions every minute if
332        // logging at DEBUG level
333        if (LOG.isDebugEnabled()) {
334          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
335            String rate = String.format("%.2f",
336              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
337            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
338              compactionName, progress, rate, throughputController);
339            lastMillis = now;
340            bytesWrittenProgressForLog = 0;
341          }
342        }
343        cells.clear();
344      } while (hasMore);
345      finished = true;
346    } catch (InterruptedException e) {
347      progress.cancel();
348      throw new InterruptedIOException(
349        "Interrupted while control throughput of compacting " + compactionName);
350    } catch (FileNotFoundException e) {
351      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
352      System.exit(-1);
353    } catch (IOException t) {
354      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
355      throw t;
356    } finally {
357      // Clone last cell in the final because writer will append last cell when committing. If
358      // don't clone here and once the scanner get closed, then the memory of last cell will be
359      // released. (HBASE-22582)
360      ((ShipperListener) writer).beforeShipped();
361      throughputController.finish(compactionName);
362      if (!finished && mobFileWriter != null) {
363        // Remove all MOB references because compaction failed
364        mobRefSet.get().clear();
365        // Abort writer
366        abortWriter(mobFileWriter);
367      }
368    }
369
370    if (mobFileWriter != null) {
371      if (mobCells > 0) {
372        // If the mob file is not empty, commit it.
373        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
374        mobFileWriter.close();
375        mobStore.commitFile(mobFileWriter.getPath(), path);
376      } else {
377        // If the mob file is empty, delete it instead of committing.
378        abortWriter(mobFileWriter);
379      }
380    }
381    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
382    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
383    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
384    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
385    progress.complete();
386    return true;
387
388  }
389
390}