001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 039import org.apache.hadoop.hbase.regionserver.CellSink; 040import org.apache.hadoop.hbase.regionserver.HStore; 041import org.apache.hadoop.hbase.regionserver.InternalScanner; 042import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.ScannerContext; 044import org.apache.hadoop.hbase.regionserver.ShipperListener; 045import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 046import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 047import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 048import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 050import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * This class is used for testing only. The main purpose is to emulate random failures during MOB 059 * compaction process. Example of usage: 060 * 061 * <pre> 062 * { 063 * @code 064 * public class SomeTest { 065 * 066 * public void initConfiguration(Configuration conf) { 067 * conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 068 * FaultyMobStoreCompactor.class.getName()); 069 * conf.setDouble("hbase.mob.compaction.fault.probability", 0.1); 070 * } 071 * } 072 * } 073 * </pre> 074 * 075 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class. 076 */ 077@InterfaceAudience.Private 078public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { 079 080 private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class); 081 082 public static AtomicLong mobCounter = new AtomicLong(); 083 public static AtomicLong totalFailures = new AtomicLong(); 084 public static AtomicLong totalCompactions = new AtomicLong(); 085 public static AtomicLong totalMajorCompactions = new AtomicLong(); 086 087 static double failureProb = 0.1d; 088 089 public FaultyMobStoreCompactor(Configuration conf, HStore store) { 090 super(conf, store); 091 failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1); 092 } 093 094 @Override 095 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 096 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 097 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 098 099 boolean major = request.isAllFiles(); 100 totalCompactions.incrementAndGet(); 101 if (major) { 102 totalMajorCompactions.incrementAndGet(); 103 } 104 long bytesWrittenProgressForLog = 0; 105 long bytesWrittenProgressForShippedCall = 0; 106 // Clear old mob references 107 mobRefSet.get().clear(); 108 boolean isUserRequest = userRequest.get(); 109 boolean compactMOBs = major && isUserRequest; 110 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 111 MobConstants.DEFAULT_MOB_DISCARD_MISS); 112 113 boolean mustFail = false; 114 if (compactMOBs) { 115 mobCounter.incrementAndGet(); 116 double dv = ThreadLocalRandom.current().nextDouble(); 117 if (dv < failureProb) { 118 mustFail = true; 119 totalFailures.incrementAndGet(); 120 } 121 } 122 123 FileSystem fs = store.getFileSystem(); 124 125 // Since scanner.next() can return 'false' but still be delivering data, 126 // we have to use a do/while loop. 127 List<Cell> cells = new ArrayList<>(); 128 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 129 long currentTime = EnvironmentEdgeManager.currentTime(); 130 long lastMillis = 0; 131 if (LOG.isDebugEnabled()) { 132 lastMillis = currentTime; 133 } 134 CloseChecker closeChecker = new CloseChecker(conf, currentTime); 135 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 136 long now = 0; 137 boolean hasMore; 138 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 139 byte[] fileName = null; 140 StoreFileWriter mobFileWriter = null; 141 long mobCells = 0; 142 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 143 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 144 boolean finished = false; 145 146 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 147 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 148 compactScannerSizeLimit) 149 .build(); 150 throughputController.start(compactionName); 151 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 152 long shippedCallSizeLimit = 153 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 154 155 ExtendedCell mobCell = null; 156 157 long counter = 0; 158 long countFailAt = -1; 159 if (mustFail) { 160 countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast 161 } 162 163 try { 164 try { 165 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 166 major ? majorCompactionCompression : minorCompactionCompression, 167 store.getRegionInfo().getStartKey(), true); 168 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 169 } catch (IOException e) { 170 // Bailing out 171 LOG.error("Failed to create mob writer, ", e); 172 throw e; 173 } 174 if (compactMOBs) { 175 // Add the only reference we get for compact MOB case 176 // because new store file will have only one MOB reference 177 // in this case - of newly compacted MOB file 178 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 179 } 180 do { 181 hasMore = scanner.next(cells, scannerContext); 182 currentTime = EnvironmentEdgeManager.currentTime(); 183 if (LOG.isDebugEnabled()) { 184 now = currentTime; 185 } 186 if (closeChecker.isTimeLimit(store, currentTime)) { 187 progress.cancel(); 188 return false; 189 } 190 for (Cell cell : cells) { 191 ExtendedCell c = (ExtendedCell) cell; 192 counter++; 193 if (compactMOBs) { 194 if (MobUtils.isMobReferenceCell(c)) { 195 if (counter == countFailAt) { 196 LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get()); 197 throw new CorruptHFileException("injected fault"); 198 } 199 String fName = MobUtils.getMobFileName(c); 200 // Added to support migration 201 try { 202 mobCell = mobStore.resolve(c, true, false).getCell(); 203 } catch (DoNotRetryIOException e) { 204 if ( 205 discardMobMiss && e.getCause() != null 206 && e.getCause() instanceof FileNotFoundException 207 ) { 208 LOG.error("Missing MOB cell: file={} not found cell={}", fName, c); 209 continue; 210 } else { 211 throw e; 212 } 213 } 214 215 if (discardMobMiss && mobCell.getValueLength() == 0) { 216 LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell); 217 continue; 218 } 219 220 if (mobCell.getValueLength() > mobSizeThreshold) { 221 // put the mob data back to the store file 222 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 223 mobFileWriter.append(mobCell); 224 writer.append( 225 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 226 mobCells++; 227 } else { 228 // If MOB value is less than threshold, append it directly to a store file 229 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 230 writer.append(mobCell); 231 cellsCountCompactedFromMob++; 232 cellsSizeCompactedFromMob += mobCell.getValueLength(); 233 } 234 } else { 235 // Not a MOB reference cell 236 int size = c.getValueLength(); 237 if (size > mobSizeThreshold) { 238 mobFileWriter.append(c); 239 writer 240 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 241 mobCells++; 242 cellsCountCompactedToMob++; 243 cellsSizeCompactedToMob += c.getValueLength(); 244 } else { 245 writer.append(c); 246 } 247 } 248 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 249 // Not a major compaction or major with MOB disabled 250 // If the kv type is not put, directly write the cell 251 // to the store file. 252 writer.append(c); 253 } else if (MobUtils.isMobReferenceCell(c)) { 254 // Not a major MOB compaction, Put MOB reference 255 if (MobUtils.hasValidMobRefCellValue(c)) { 256 int size = MobUtils.getMobValueLength(c); 257 if (size > mobSizeThreshold) { 258 // If the value size is larger than the threshold, it's regarded as a mob. Since 259 // its value is already in the mob file, directly write this cell to the store file 260 Optional<TableName> refTable = MobUtils.getTableName(c); 261 if (refTable.isPresent()) { 262 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 263 writer.append(c); 264 } else { 265 throw new IOException(String.format("MOB cell did not contain a tablename " 266 + "tag. should not be possible. see ref guide on mob troubleshooting. " 267 + "store=%s cell=%s", getStoreInfo(), c)); 268 } 269 } else { 270 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 271 // the mob cell from the mob file, and write it back to the store file. 272 mobCell = mobStore.resolve(c, true, false).getCell(); 273 if (mobCell.getValueLength() != 0) { 274 // put the mob data back to the store file 275 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 276 writer.append(mobCell); 277 cellsCountCompactedFromMob++; 278 cellsSizeCompactedFromMob += mobCell.getValueLength(); 279 } else { 280 // If the value of a file is empty, there might be issues when retrieving, 281 // directly write the cell to the store file, and leave it to be handled by the 282 // next compaction. 283 LOG.error("Empty value for: " + c); 284 Optional<TableName> refTable = MobUtils.getTableName(c); 285 if (refTable.isPresent()) { 286 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 287 writer.append(c); 288 } else { 289 throw new IOException(String.format("MOB cell did not contain a tablename " 290 + "tag. should not be possible. see ref guide on mob troubleshooting. " 291 + "store=%s cell=%s", getStoreInfo(), c)); 292 } 293 } 294 } 295 } else { 296 LOG.error("Corrupted MOB reference: {}", c); 297 writer.append(c); 298 } 299 } else if (c.getValueLength() <= mobSizeThreshold) { 300 // If the value size of a cell is not larger than the threshold, directly write it to 301 // the store file. 302 writer.append(c); 303 } else { 304 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 305 // write this cell to a mob file, and write the path to the store file. 306 mobCells++; 307 // append the original keyValue in the mob file. 308 mobFileWriter.append(c); 309 ExtendedCell reference = 310 MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 311 // write the cell whose value is the path of a mob file to the store file. 312 writer.append(reference); 313 cellsCountCompactedToMob++; 314 cellsSizeCompactedToMob += c.getValueLength(); 315 // Add ref we get for compact MOB case 316 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 317 } 318 319 int len = c.getSerializedSize(); 320 ++progress.currentCompactedKVs; 321 progress.totalCompactedSize += len; 322 bytesWrittenProgressForShippedCall += len; 323 if (LOG.isDebugEnabled()) { 324 bytesWrittenProgressForLog += len; 325 } 326 throughputController.control(compactionName, len); 327 if (closeChecker.isSizeLimit(store, len)) { 328 progress.cancel(); 329 return false; 330 } 331 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 332 ((ShipperListener) writer).beforeShipped(); 333 kvs.shipped(); 334 bytesWrittenProgressForShippedCall = 0; 335 } 336 } 337 // Log the progress of long running compactions every minute if 338 // logging at DEBUG level 339 if (LOG.isDebugEnabled()) { 340 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 341 String rate = String.format("%.2f", 342 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 343 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 344 compactionName, progress, rate, throughputController); 345 lastMillis = now; 346 bytesWrittenProgressForLog = 0; 347 } 348 } 349 cells.clear(); 350 } while (hasMore); 351 finished = true; 352 } catch (InterruptedException e) { 353 progress.cancel(); 354 throw new InterruptedIOException( 355 "Interrupted while control throughput of compacting " + compactionName); 356 } catch (FileNotFoundException e) { 357 LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e); 358 System.exit(-1); 359 } catch (IOException t) { 360 LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName()); 361 throw t; 362 } finally { 363 // Clone last cell in the final because writer will append last cell when committing. If 364 // don't clone here and once the scanner get closed, then the memory of last cell will be 365 // released. (HBASE-22582) 366 ((ShipperListener) writer).beforeShipped(); 367 throughputController.finish(compactionName); 368 if (!finished && mobFileWriter != null) { 369 // Remove all MOB references because compaction failed 370 mobRefSet.get().clear(); 371 // Abort writer 372 abortWriter(mobFileWriter); 373 } 374 } 375 376 if (mobFileWriter != null) { 377 if (mobCells > 0) { 378 // If the mob file is not empty, commit it. 379 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 380 mobFileWriter.close(); 381 mobStore.commitFile(mobFileWriter.getPath(), path); 382 } else { 383 // If the mob file is empty, delete it instead of committing. 384 abortWriter(mobFileWriter); 385 } 386 } 387 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 388 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 389 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 390 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 391 progress.complete(); 392 return true; 393 394 } 395 396}