001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; 024import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; 025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 026import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; 027import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 028import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; 029import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; 030 031import java.io.IOException; 032import java.net.InetSocketAddress; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.HashSet; 036import java.util.Set; 037import java.util.UUID; 038import java.util.function.Consumer; 039import java.util.function.Supplier; 040import java.util.regex.Pattern; 041import java.util.stream.Collectors; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.PrivateCellUtil; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFile; 053import org.apache.hadoop.hbase.io.hfile.HFileContext; 054import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 055import org.apache.hadoop.hbase.mob.MobUtils; 056import org.apache.hadoop.hbase.util.BloomContext; 057import org.apache.hadoop.hbase.util.BloomFilterFactory; 058import org.apache.hadoop.hbase.util.BloomFilterUtil; 059import org.apache.hadoop.hbase.util.BloomFilterWriter; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.RowBloomContext; 063import org.apache.hadoop.hbase.util.RowColBloomContext; 064import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 070import org.apache.hbase.thirdparty.com.google.common.base.Strings; 071import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 072 073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 074 075/** 076 * A StoreFile writer. Use this to read/write HBase Store Files. It is package local because it is 077 * an implementation detail of the HBase regionserver. 078 */ 079@InterfaceAudience.Private 080public class StoreFileWriter implements CellSink, ShipperListener { 081 private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); 082 private static final Pattern dash = Pattern.compile("-"); 083 private final BloomFilterWriter generalBloomFilterWriter; 084 private final BloomFilterWriter deleteFamilyBloomFilterWriter; 085 private final BloomType bloomType; 086 private byte[] bloomParam = null; 087 private long earliestPutTs = HConstants.LATEST_TIMESTAMP; 088 private long deleteFamilyCnt = 0; 089 private BloomContext bloomContext = null; 090 private BloomContext deleteFamilyBloomContext = null; 091 private final TimeRangeTracker timeRangeTracker; 092 private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; 093 094 protected HFile.Writer writer; 095 096 /** 097 * Creates an HFile.Writer that also write helpful meta data. 098 * @param fs file system to write to 099 * @param path file name to create 100 * @param conf user configuration 101 * @param bloomType bloom filter setting 102 * @param maxKeys the expected maximum number of keys to be added. Was used for 103 * Bloom filter size in {@link HFile} format version 1. 104 * @param favoredNodes an array of favored nodes or possibly null 105 * @param fileContext The HFile context 106 * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. 107 * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived 108 * @throws IOException problem writing to FS 109 */ 110 private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, 111 BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, 112 boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier) 113 throws IOException { 114 this.compactedFilesSupplier = compactedFilesSupplier; 115 this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); 116 // TODO : Change all writers to be specifically created for compaction context 117 writer = 118 HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) 119 .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); 120 121 generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, 122 bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 123 124 if (generalBloomFilterWriter != null) { 125 this.bloomType = bloomType; 126 this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); 127 if (LOG.isTraceEnabled()) { 128 LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " 129 + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH 130 ? Bytes.toInt(bloomParam) 131 : Bytes.toStringBinary(bloomParam)) 132 + ", " + generalBloomFilterWriter.getClass().getSimpleName()); 133 } 134 // init bloom context 135 switch (bloomType) { 136 case ROW: 137 bloomContext = 138 new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 139 break; 140 case ROWCOL: 141 bloomContext = 142 new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); 143 break; 144 case ROWPREFIX_FIXED_LENGTH: 145 bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, 146 fileContext.getCellComparator(), Bytes.toInt(bloomParam)); 147 break; 148 default: 149 throw new IOException( 150 "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); 151 } 152 } else { 153 // Not using Bloom filters. 154 this.bloomType = BloomType.NONE; 155 } 156 157 // initialize delete family Bloom filter when there is NO RowCol Bloom filter 158 if (this.bloomType != BloomType.ROWCOL) { 159 this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, 160 cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); 161 deleteFamilyBloomContext = 162 new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); 163 } else { 164 deleteFamilyBloomFilterWriter = null; 165 } 166 if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { 167 LOG.trace("Delete Family Bloom filter type for " + path + ": " 168 + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); 169 } 170 } 171 172 public long getPos() throws IOException { 173 return ((HFileWriterImpl) writer).getPos(); 174 } 175 176 /** 177 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 178 * @param maxSequenceId Maximum sequence id. 179 * @param majorCompaction True if this file is product of a major compaction 180 * @throws IOException problem writing to FS 181 */ 182 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) 183 throws IOException { 184 appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); 185 } 186 187 /** 188 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 189 * @param maxSequenceId Maximum sequence id. 190 * @param majorCompaction True if this file is product of a major compaction 191 * @param storeFiles The compacted store files to generate this new file 192 * @throws IOException problem writing to FS 193 */ 194 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 195 final Collection<HStoreFile> storeFiles) throws IOException { 196 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 197 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 198 writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); 199 appendTrackedTimestampsToMetadata(); 200 } 201 202 /** 203 * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted 204 * store files's name is needed. But if the compacted store file is a result of compaction, it's 205 * compacted files which still not archived is needed, too. And don't need to add compacted files 206 * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will 207 * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E 208 * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to 209 * add D's compacted file, as D's compacted files has been in E's compacted files, too. See 210 * HBASE-20724 for more details. 211 * @param storeFiles The compacted store files to generate this new file 212 * @return bytes of CompactionEventTracker 213 */ 214 private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) { 215 Set<String> notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() 216 .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); 217 Set<String> compactedStoreFiles = new HashSet<>(); 218 for (HStoreFile storeFile : storeFiles) { 219 compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); 220 for (String csf : storeFile.getCompactedStoreFiles()) { 221 if (notArchivedCompactedStoreFiles.contains(csf)) { 222 compactedStoreFiles.add(csf); 223 } 224 } 225 } 226 return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); 227 } 228 229 /** 230 * Writes meta data. Call before {@link #close()} since its written as meta data to this file. 231 * @param maxSequenceId Maximum sequence id. 232 * @param majorCompaction True if this file is product of a major compaction 233 * @param mobCellsCount The number of mob cells. 234 * @throws IOException problem writing to FS 235 */ 236 public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, 237 final long mobCellsCount) throws IOException { 238 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); 239 writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); 240 writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); 241 appendTrackedTimestampsToMetadata(); 242 } 243 244 /** 245 * Appends MOB - specific metadata (even if it is empty) 246 * @param mobRefSet - original table -> set of MOB file names 247 * @throws IOException problem writing to FS 248 */ 249 public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException { 250 writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); 251 } 252 253 /** 254 * Add TimestampRange and earliest put timestamp to Metadata 255 */ 256 public void appendTrackedTimestampsToMetadata() throws IOException { 257 // TODO: The StoreFileReader always converts the byte[] to TimeRange 258 // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. 259 appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); 260 appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); 261 } 262 263 /** 264 * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker 265 * to include the timestamp of this key 266 */ 267 public void trackTimestamps(final Cell cell) { 268 if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { 269 earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); 270 } 271 timeRangeTracker.includeTimestamp(cell); 272 } 273 274 private void appendGeneralBloomfilter(final Cell cell) throws IOException { 275 if (this.generalBloomFilterWriter != null) { 276 /* 277 * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png 278 * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of 279 * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed Length 280 * Row Prefix 281 */ 282 bloomContext.writeBloom(cell); 283 } 284 } 285 286 private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException { 287 if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { 288 return; 289 } 290 291 // increase the number of delete family in the store file 292 deleteFamilyCnt++; 293 if (this.deleteFamilyBloomFilterWriter != null) { 294 deleteFamilyBloomContext.writeBloom(cell); 295 } 296 } 297 298 @Override 299 public void append(final Cell cell) throws IOException { 300 appendGeneralBloomfilter(cell); 301 appendDeleteFamilyBloomFilter(cell); 302 writer.append(cell); 303 trackTimestamps(cell); 304 } 305 306 @Override 307 public void beforeShipped() throws IOException { 308 // For now these writer will always be of type ShipperListener true. 309 // TODO : Change all writers to be specifically created for compaction context 310 writer.beforeShipped(); 311 if (generalBloomFilterWriter != null) { 312 generalBloomFilterWriter.beforeShipped(); 313 } 314 if (deleteFamilyBloomFilterWriter != null) { 315 deleteFamilyBloomFilterWriter.beforeShipped(); 316 } 317 } 318 319 public Path getPath() { 320 return this.writer.getPath(); 321 } 322 323 public boolean hasGeneralBloom() { 324 return this.generalBloomFilterWriter != null; 325 } 326 327 /** 328 * For unit testing only. 329 * @return the Bloom filter used by this writer. 330 */ 331 BloomFilterWriter getGeneralBloomWriter() { 332 return generalBloomFilterWriter; 333 } 334 335 private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { 336 boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); 337 if (haveBloom) { 338 bfw.compactBloom(); 339 } 340 return haveBloom; 341 } 342 343 private boolean closeGeneralBloomFilter() throws IOException { 344 boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); 345 346 // add the general Bloom filter writer and append file info 347 if (hasGeneralBloom) { 348 writer.addGeneralBloomFilter(generalBloomFilterWriter); 349 writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); 350 if (bloomParam != null) { 351 writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); 352 } 353 bloomContext.addLastBloomKey(writer); 354 } 355 return hasGeneralBloom; 356 } 357 358 private boolean closeDeleteFamilyBloomFilter() throws IOException { 359 boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); 360 361 // add the delete family Bloom filter writer 362 if (hasDeleteFamilyBloom) { 363 writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); 364 } 365 366 // append file info about the number of delete family kvs 367 // even if there is no delete family Bloom. 368 writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); 369 370 return hasDeleteFamilyBloom; 371 } 372 373 public void close() throws IOException { 374 boolean hasGeneralBloom = this.closeGeneralBloomFilter(); 375 boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); 376 377 writer.close(); 378 379 // Log final Bloom filter statistics. This needs to be done after close() 380 // because compound Bloom filters might be finalized as part of closing. 381 if (LOG.isTraceEnabled()) { 382 LOG.trace( 383 (hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ") 384 + "DeleteFamily" + " was added to HFile " + getPath()); 385 } 386 387 } 388 389 public void appendFileInfo(byte[] key, byte[] value) throws IOException { 390 writer.appendFileInfo(key, value); 391 } 392 393 /** 394 * For use in testing. 395 */ 396 HFile.Writer getHFileWriter() { 397 return writer; 398 } 399 400 /** 401 * @param dir Directory to create file in. 402 * @return random filename inside passed <code>dir</code> 403 */ 404 public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { 405 if (!fs.getFileStatus(dir).isDirectory()) { 406 throw new IOException("Expecting " + dir.toString() + " to be a directory"); 407 } 408 return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); 409 } 410 411 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", 412 justification = "Will not overflow") 413 public static class Builder { 414 private final Configuration conf; 415 private final CacheConfig cacheConf; 416 private final FileSystem fs; 417 418 private BloomType bloomType = BloomType.NONE; 419 private long maxKeyCount = 0; 420 private Path dir; 421 private Path filePath; 422 private InetSocketAddress[] favoredNodes; 423 private HFileContext fileContext; 424 private boolean shouldDropCacheBehind; 425 private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet(); 426 private String fileStoragePolicy; 427 // this is used to track the creation of the StoreFileWriter, mainly used for the SFT 428 // implementation where we will write store files directly to the final place, instead of 429 // writing a tmp file first. Under this scenario, we will have a background task to purge the 430 // store files which are not recorded in the SFT, but for the newly created store file writer, 431 // they are not tracked in SFT, so here we need to record them and treat them specially. 432 private Consumer<Path> writerCreationTracker; 433 434 public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { 435 this.conf = conf; 436 this.cacheConf = cacheConf; 437 this.fs = fs; 438 } 439 440 /** 441 * Creates Builder with cache configuration disabled 442 */ 443 public Builder(Configuration conf, FileSystem fs) { 444 this.conf = conf; 445 this.cacheConf = CacheConfig.DISABLED; 446 this.fs = fs; 447 } 448 449 /** 450 * Use either this method or {@link #withFilePath}, but not both. 451 * @param dir Path to column family directory. The directory is created if does not exist. The 452 * file is given a unique name within this directory. 453 * @return this (for chained invocation) 454 */ 455 public Builder withOutputDir(Path dir) { 456 Preconditions.checkNotNull(dir); 457 this.dir = dir; 458 return this; 459 } 460 461 /** 462 * Use either this method or {@link #withOutputDir}, but not both. 463 * @param filePath the StoreFile path to write 464 * @return this (for chained invocation) 465 */ 466 public Builder withFilePath(Path filePath) { 467 Preconditions.checkNotNull(filePath); 468 this.filePath = filePath; 469 return this; 470 } 471 472 /** 473 * @param favoredNodes an array of favored nodes or possibly null 474 * @return this (for chained invocation) 475 */ 476 public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { 477 this.favoredNodes = favoredNodes; 478 return this; 479 } 480 481 public Builder withBloomType(BloomType bloomType) { 482 Preconditions.checkNotNull(bloomType); 483 this.bloomType = bloomType; 484 return this; 485 } 486 487 /** 488 * @param maxKeyCount estimated maximum number of keys we expect to add 489 * @return this (for chained invocation) 490 */ 491 public Builder withMaxKeyCount(long maxKeyCount) { 492 this.maxKeyCount = maxKeyCount; 493 return this; 494 } 495 496 public Builder withFileContext(HFileContext fileContext) { 497 this.fileContext = fileContext; 498 return this; 499 } 500 501 public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { 502 this.shouldDropCacheBehind = shouldDropCacheBehind; 503 return this; 504 } 505 506 public Builder 507 withCompactedFilesSupplier(Supplier<Collection<HStoreFile>> compactedFilesSupplier) { 508 this.compactedFilesSupplier = compactedFilesSupplier; 509 return this; 510 } 511 512 public Builder withFileStoragePolicy(String fileStoragePolicy) { 513 this.fileStoragePolicy = fileStoragePolicy; 514 return this; 515 } 516 517 public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) { 518 this.writerCreationTracker = writerCreationTracker; 519 return this; 520 } 521 522 /** 523 * Create a store file writer. Client is responsible for closing file when done. If metadata, 524 * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. 525 */ 526 public StoreFileWriter build() throws IOException { 527 if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { 528 throw new IllegalArgumentException("Either specify parent directory " + "or file path"); 529 } 530 531 if (dir == null) { 532 dir = filePath.getParent(); 533 } 534 535 if (!fs.exists(dir)) { 536 // Handle permission for non-HDFS filesystem properly 537 // See HBASE-17710 538 HRegionFileSystem.mkdirs(fs, conf, dir); 539 } 540 541 // set block storage policy for temp path 542 String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); 543 if (null == policyName) { 544 policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); 545 } 546 CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); 547 548 if (filePath == null) { 549 // The stored file and related blocks will used the directory based StoragePolicy. 550 // Because HDFS DistributedFileSystem does not support create files with storage policy 551 // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files 552 // satisfy the specific storage policy when writing. So as to avoid later data movement. 553 // We don't want to change whole temp dir to 'fileStoragePolicy'. 554 if (!Strings.isNullOrEmpty(fileStoragePolicy)) { 555 dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy); 556 if (!fs.exists(dir)) { 557 HRegionFileSystem.mkdirs(fs, conf, dir); 558 LOG.info( 559 "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy); 560 } 561 CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); 562 } 563 filePath = getUniqueFile(fs, dir); 564 if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { 565 bloomType = BloomType.NONE; 566 } 567 } 568 // make sure we call this before actually create the writer 569 // in fact, it is not a big deal to even add an inexistent file to the track, as we will never 570 // try to delete it and finally we will clean the tracker up after compaction. But if the file 571 // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file 572 // and cause problem. 573 if (writerCreationTracker != null) { 574 writerCreationTracker.accept(filePath); 575 } 576 return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, 577 favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); 578 } 579 } 580}