001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Optional; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 036import org.apache.hadoop.hbase.regionserver.CellSink; 037import org.apache.hadoop.hbase.regionserver.HStore; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 040import org.apache.hadoop.hbase.regionserver.ScannerContext; 041import org.apache.hadoop.hbase.regionserver.ShipperListener; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; 044import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * This class is used for testing only. The main purpose is to emulate random failures during MOB 056 * compaction process. Example of usage: 057 * 058 * <pre> 059 * { 060 * @code 061 * public class SomeTest { 062 * 063 * public void initConfiguration(Configuration conf) { 064 * conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 065 * FaultyMobStoreCompactor.class.getName()); 066 * conf.setDouble("hbase.mob.compaction.fault.probability", 0.1); 067 * } 068 * } 069 * } 070 * </pre> 071 * 072 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class. 073 */ 074@InterfaceAudience.Private 075public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor { 076 077 private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class); 078 079 public static AtomicLong mobCounter = new AtomicLong(); 080 public static AtomicLong totalFailures = new AtomicLong(); 081 public static AtomicLong totalCompactions = new AtomicLong(); 082 public static AtomicLong totalMajorCompactions = new AtomicLong(); 083 084 static double failureProb = 0.1d; 085 static Random rnd = new Random(); 086 087 public FaultyMobStoreCompactor(Configuration conf, HStore store) { 088 super(conf, store); 089 failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1); 090 } 091 092 @Override 093 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, 094 long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, 095 CompactionRequestImpl request, CompactionProgress progress) throws IOException { 096 097 boolean major = request.isAllFiles(); 098 totalCompactions.incrementAndGet(); 099 if (major) { 100 totalMajorCompactions.incrementAndGet(); 101 } 102 long bytesWrittenProgressForCloseCheck = 0; 103 long bytesWrittenProgressForLog = 0; 104 long bytesWrittenProgressForShippedCall = 0; 105 // Clear old mob references 106 mobRefSet.get().clear(); 107 boolean isUserRequest = userRequest.get(); 108 boolean compactMOBs = major && isUserRequest; 109 boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY, 110 MobConstants.DEFAULT_MOB_DISCARD_MISS); 111 112 boolean mustFail = false; 113 if (compactMOBs) { 114 mobCounter.incrementAndGet(); 115 double dv = rnd.nextDouble(); 116 if (dv < failureProb) { 117 mustFail = true; 118 totalFailures.incrementAndGet(); 119 } 120 } 121 122 // Since scanner.next() can return 'false' but still be delivering data, 123 // we have to use a do/while loop. 124 List<Cell> cells = new ArrayList<>(); 125 // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME 126 int closeCheckSizeLimit = 127 conf.getInt(CloseChecker.SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 128 long lastMillis = 0; 129 if (LOG.isDebugEnabled()) { 130 lastMillis = EnvironmentEdgeManager.currentTime(); 131 } 132 String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); 133 long now = 0; 134 boolean hasMore; 135 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); 136 byte[] fileName = null; 137 StoreFileWriter mobFileWriter = null; 138 long mobCells = 0; 139 long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; 140 long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; 141 boolean finished = false; 142 143 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) 144 .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, 145 compactScannerSizeLimit) 146 .build(); 147 throughputController.start(compactionName); 148 KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; 149 long shippedCallSizeLimit = 150 (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); 151 152 Cell mobCell = null; 153 154 long counter = 0; 155 long countFailAt = -1; 156 if (mustFail) { 157 countFailAt = rnd.nextInt(100); // randomly fail fast 158 } 159 160 try { 161 try { 162 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, 163 major ? majorCompactionCompression : minorCompactionCompression, 164 store.getRegionInfo().getStartKey(), true); 165 fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); 166 } catch (IOException e) { 167 // Bailing out 168 LOG.error("Failed to create mob writer, ", e); 169 throw e; 170 } 171 if (compactMOBs) { 172 // Add the only reference we get for compact MOB case 173 // because new store file will have only one MOB reference 174 // in this case - of newly compacted MOB file 175 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 176 } 177 do { 178 hasMore = scanner.next(cells, scannerContext); 179 if (LOG.isDebugEnabled()) { 180 now = EnvironmentEdgeManager.currentTime(); 181 } 182 for (Cell c : cells) { 183 counter++; 184 if (compactMOBs) { 185 if (MobUtils.isMobReferenceCell(c)) { 186 if (counter == countFailAt) { 187 LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get()); 188 throw new CorruptHFileException("injected fault"); 189 } 190 String fName = MobUtils.getMobFileName(c); 191 // Added to support migration 192 try { 193 mobCell = mobStore.resolve(c, true, false).getCell(); 194 } catch (FileNotFoundException fnfe) { 195 if (discardMobMiss) { 196 LOG.error("Missing MOB cell: file={} not found", fName); 197 continue; 198 } else { 199 throw fnfe; 200 } 201 } 202 203 if (discardMobMiss && mobCell.getValueLength() == 0) { 204 LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell); 205 continue; 206 } 207 208 if (mobCell.getValueLength() > mobSizeThreshold) { 209 // put the mob data back to the store file 210 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 211 mobFileWriter.append(mobCell); 212 writer.append( 213 MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags())); 214 mobCells++; 215 } else { 216 // If MOB value is less than threshold, append it directly to a store file 217 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 218 writer.append(mobCell); 219 cellsCountCompactedFromMob++; 220 cellsSizeCompactedFromMob += mobCell.getValueLength(); 221 } 222 } else { 223 // Not a MOB reference cell 224 int size = c.getValueLength(); 225 if (size > mobSizeThreshold) { 226 mobFileWriter.append(c); 227 writer 228 .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags())); 229 mobCells++; 230 cellsCountCompactedToMob++; 231 cellsSizeCompactedToMob += c.getValueLength(); 232 } else { 233 writer.append(c); 234 } 235 } 236 } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) { 237 // Not a major compaction or major with MOB disabled 238 // If the kv type is not put, directly write the cell 239 // to the store file. 240 writer.append(c); 241 } else if (MobUtils.isMobReferenceCell(c)) { 242 // Not a major MOB compaction, Put MOB reference 243 if (MobUtils.hasValidMobRefCellValue(c)) { 244 int size = MobUtils.getMobValueLength(c); 245 if (size > mobSizeThreshold) { 246 // If the value size is larger than the threshold, it's regarded as a mob. Since 247 // its value is already in the mob file, directly write this cell to the store file 248 Optional<TableName> refTable = MobUtils.getTableName(c); 249 if (refTable.isPresent()) { 250 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 251 writer.append(c); 252 } else { 253 throw new IOException(String.format("MOB cell did not contain a tablename " 254 + "tag. should not be possible. see ref guide on mob troubleshooting. " 255 + "store=%s cell=%s", getStoreInfo(), c)); 256 } 257 } else { 258 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve 259 // the mob cell from the mob file, and write it back to the store file. 260 mobCell = mobStore.resolve(c, true, false).getCell(); 261 if (mobCell.getValueLength() != 0) { 262 // put the mob data back to the store file 263 PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); 264 writer.append(mobCell); 265 cellsCountCompactedFromMob++; 266 cellsSizeCompactedFromMob += mobCell.getValueLength(); 267 } else { 268 // If the value of a file is empty, there might be issues when retrieving, 269 // directly write the cell to the store file, and leave it to be handled by the 270 // next compaction. 271 LOG.error("Empty value for: " + c); 272 Optional<TableName> refTable = MobUtils.getTableName(c); 273 if (refTable.isPresent()) { 274 mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c)); 275 writer.append(c); 276 } else { 277 throw new IOException(String.format("MOB cell did not contain a tablename " 278 + "tag. should not be possible. see ref guide on mob troubleshooting. " 279 + "store=%s cell=%s", getStoreInfo(), c)); 280 } 281 } 282 } 283 } else { 284 LOG.error("Corrupted MOB reference: {}", c); 285 writer.append(c); 286 } 287 } else if (c.getValueLength() <= mobSizeThreshold) { 288 // If the value size of a cell is not larger than the threshold, directly write it to 289 // the store file. 290 writer.append(c); 291 } else { 292 // If the value size of a cell is larger than the threshold, it's regarded as a mob, 293 // write this cell to a mob file, and write the path to the store file. 294 mobCells++; 295 // append the original keyValue in the mob file. 296 mobFileWriter.append(c); 297 Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); 298 // write the cell whose value is the path of a mob file to the store file. 299 writer.append(reference); 300 cellsCountCompactedToMob++; 301 cellsSizeCompactedToMob += c.getValueLength(); 302 // Add ref we get for compact MOB case 303 mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName()); 304 } 305 306 int len = c.getSerializedSize(); 307 ++progress.currentCompactedKVs; 308 progress.totalCompactedSize += len; 309 bytesWrittenProgressForShippedCall += len; 310 if (LOG.isDebugEnabled()) { 311 bytesWrittenProgressForLog += len; 312 } 313 throughputController.control(compactionName, len); 314 // check periodically to see if a system stop is requested 315 if (closeCheckSizeLimit > 0) { 316 bytesWrittenProgressForCloseCheck += len; 317 if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { 318 bytesWrittenProgressForCloseCheck = 0; 319 if (!store.areWritesEnabled()) { 320 progress.cancel(); 321 return false; 322 } 323 } 324 } 325 if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { 326 ((ShipperListener) writer).beforeShipped(); 327 kvs.shipped(); 328 bytesWrittenProgressForShippedCall = 0; 329 } 330 } 331 // Log the progress of long running compactions every minute if 332 // logging at DEBUG level 333 if (LOG.isDebugEnabled()) { 334 if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { 335 String rate = String.format("%.2f", 336 (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)); 337 LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}", 338 compactionName, progress, rate, throughputController); 339 lastMillis = now; 340 bytesWrittenProgressForLog = 0; 341 } 342 } 343 cells.clear(); 344 } while (hasMore); 345 finished = true; 346 } catch (InterruptedException e) { 347 progress.cancel(); 348 throw new InterruptedIOException( 349 "Interrupted while control throughput of compacting " + compactionName); 350 } catch (FileNotFoundException e) { 351 LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e); 352 System.exit(-1); 353 } catch (IOException t) { 354 LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName()); 355 throw t; 356 } finally { 357 // Clone last cell in the final because writer will append last cell when committing. If 358 // don't clone here and once the scanner get closed, then the memory of last cell will be 359 // released. (HBASE-22582) 360 ((ShipperListener) writer).beforeShipped(); 361 throughputController.finish(compactionName); 362 if (!finished && mobFileWriter != null) { 363 // Remove all MOB references because compaction failed 364 mobRefSet.get().clear(); 365 // Abort writer 366 abortWriter(mobFileWriter); 367 } 368 } 369 370 if (mobFileWriter != null) { 371 if (mobCells > 0) { 372 // If the mob file is not empty, commit it. 373 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); 374 mobFileWriter.close(); 375 mobStore.commitFile(mobFileWriter.getPath(), path); 376 } else { 377 // If the mob file is empty, delete it instead of committing. 378 abortWriter(mobFileWriter); 379 } 380 } 381 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); 382 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); 383 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); 384 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); 385 progress.complete(); 386 return true; 387 388 } 389 390}