001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.BufferedInputStream; 023import java.io.DataInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.Collection; 027import java.util.List; 028import java.util.regex.Matcher; 029import org.apache.commons.io.IOUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.io.HFileLink; 039import org.apache.hadoop.hbase.io.Reference; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.crypto.Encryption; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.HFile; 044import org.apache.hadoop.hbase.io.hfile.HFileContext; 045import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 046import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 047import org.apache.hadoop.hbase.regionserver.StoreContext; 048import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 049import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 050import org.apache.hadoop.hbase.regionserver.StoreUtils; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.HFileArchiveUtil; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 059 060/** 061 * Base class for all store file tracker. 062 * <p/> 063 * Mainly used to place the common logic to skip persistent for secondary replicas. 064 */ 065@InterfaceAudience.Private 066abstract class StoreFileTrackerBase implements StoreFileTracker { 067 068 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 069 070 protected final Configuration conf; 071 072 protected final boolean isPrimaryReplica; 073 074 protected final StoreContext ctx; 075 076 private volatile boolean cacheOnWriteLogged; 077 078 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 079 this.conf = conf; 080 this.isPrimaryReplica = isPrimaryReplica; 081 this.ctx = ctx; 082 } 083 084 @Override 085 public final List<StoreFileInfo> load() throws IOException { 086 return doLoadStoreFiles(!isPrimaryReplica); 087 } 088 089 @Override 090 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 091 if (isPrimaryReplica) { 092 doAddNewStoreFiles(newFiles); 093 } 094 } 095 096 @Override 097 public final void replace(Collection<StoreFileInfo> compactedFiles, 098 Collection<StoreFileInfo> newFiles) throws IOException { 099 if (isPrimaryReplica) { 100 doAddCompactionResults(compactedFiles, newFiles); 101 } 102 } 103 104 @Override 105 public final void set(List<StoreFileInfo> files) throws IOException { 106 if (isPrimaryReplica) { 107 doSetStoreFiles(files); 108 } 109 } 110 111 @Override 112 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 113 builder.setValue(TRACKER_IMPL, getTrackerName()); 114 return builder; 115 } 116 117 protected final String getTrackerName() { 118 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 119 } 120 121 private HFileContext createFileContext(Compression.Algorithm compression, 122 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 123 if (compression == null) { 124 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 125 } 126 ColumnFamilyDescriptor family = ctx.getFamily(); 127 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 128 .withIncludesTags(includesTag).withCompression(compression) 129 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 130 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 131 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 132 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 133 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 134 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 135 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 136 return hFileContext; 137 } 138 139 @Override 140 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 141 if (!isPrimaryReplica) { 142 throw new IllegalStateException("Should not call create writer on secondary replicas"); 143 } 144 // creating new cache config for each new writer 145 final CacheConfig cacheConf = ctx.getCacheConf(); 146 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 147 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 148 if (params.isCompaction()) { 149 // Don't cache data on write on compactions, unless specifically configured to do so 150 // Cache only when total file size remains lower than configured threshold 151 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 152 // if data blocks are to be cached on write 153 // during compaction, we should forcefully 154 // cache index and bloom blocks as well 155 if ( 156 cacheCompactedBlocksOnWrite 157 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 158 ) { 159 writerCacheConf.enableCacheOnWrite(); 160 if (!cacheOnWriteLogged) { 161 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 162 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 163 cacheOnWriteLogged = true; 164 } 165 } else { 166 writerCacheConf.setCacheDataOnWrite(false); 167 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 168 // checking condition once again for logging 169 LOG.debug( 170 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 171 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 172 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 173 } 174 } 175 } else { 176 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 177 if (shouldCacheDataOnWrite) { 178 writerCacheConf.enableCacheOnWrite(); 179 if (!cacheOnWriteLogged) { 180 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 181 + "Index blocks and Bloom filter blocks", this); 182 cacheOnWriteLogged = true; 183 } 184 } 185 } 186 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 187 HFileContext hFileContext = createFileContext(params.compression(), 188 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 189 Path outputDir; 190 if (requireWritingToTmpDirFirst()) { 191 outputDir = 192 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 193 } else { 194 outputDir = ctx.getFamilyStoreDirectoryPath(); 195 } 196 StoreFileWriter.Builder builder = 197 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 198 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 199 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 200 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 201 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 202 .withFileStoragePolicy(params.fileStoragePolicy()) 203 .withWriterCreationTracker(params.writerCreationTracker()) 204 .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) 205 .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); 206 return builder.build(); 207 } 208 209 @Override 210 public Reference createReference(Reference reference, Path path) throws IOException { 211 FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false); 212 try { 213 out.write(reference.toByteArray()); 214 } finally { 215 out.close(); 216 } 217 return reference; 218 } 219 220 /** 221 * Returns true if the specified family has reference files 222 * @param familyName Column Family Name 223 * @return true if family contains reference files 224 */ 225 public boolean hasReferences() throws IOException { 226 Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString()); 227 FileStatus[] files = 228 CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir); 229 if (files != null) { 230 for (FileStatus stat : files) { 231 if (stat.isDirectory()) { 232 continue; 233 } 234 if (StoreFileInfo.isReference(stat.getPath())) { 235 LOG.trace("Reference {}", stat.getPath()); 236 return true; 237 } 238 } 239 } 240 return false; 241 } 242 243 @Override 244 public Reference readReference(final Path p) throws IOException { 245 InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p); 246 try { 247 // I need to be able to move back in the stream if this is not a pb serialization so I can 248 // do the Writable decoding instead. 249 in = in.markSupported() ? in : new BufferedInputStream(in); 250 int pblen = ProtobufUtil.lengthOfPBMagic(); 251 in.mark(pblen); 252 byte[] pbuf = new byte[pblen]; 253 IOUtils.readFully(in, pbuf, 0, pblen); 254 // WATCHOUT! Return in middle of function!!! 255 if (ProtobufUtil.isPBMagicPrefix(pbuf)) { 256 return Reference.convert( 257 org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in)); 258 } 259 // Else presume Writables. Need to reset the stream since it didn't start w/ pb. 260 // We won't bother rewriting thie Reference as a pb since Reference is transitory. 261 in.reset(); 262 Reference r = new Reference(); 263 DataInputStream dis = new DataInputStream(in); 264 // Set in = dis so it gets the close below in the finally on our way out. 265 in = dis; 266 r.readFields(dis); 267 return r; 268 } finally { 269 in.close(); 270 } 271 } 272 273 @Override 274 public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica) 275 throws IOException { 276 return getStoreFileInfo(null, initialPath, primaryReplica); 277 } 278 279 @Override 280 public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath, 281 boolean primaryReplica) throws IOException { 282 FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem(); 283 assert fs != null; 284 assert initialPath != null; 285 assert conf != null; 286 Reference reference = null; 287 HFileLink link = null; 288 long createdTimestamp = 0; 289 long size = 0; 290 Path p = initialPath; 291 if (HFileLink.isHFileLink(p)) { 292 // HFileLink 293 reference = null; 294 link = HFileLink.buildFromHFileLinkPattern(conf, p); 295 LOG.trace("{} is a link", p); 296 } else if (StoreFileInfo.isReference(p)) { 297 reference = readReference(p); 298 Path referencePath = StoreFileInfo.getReferredToFile(p); 299 if (HFileLink.isHFileLink(referencePath)) { 300 // HFileLink Reference 301 link = HFileLink.buildFromHFileLinkPattern(conf, referencePath); 302 } else { 303 // Reference 304 link = null; 305 } 306 LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath); 307 } else 308 if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) { 309 // HFile 310 if (fileStatus != null) { 311 createdTimestamp = fileStatus.getModificationTime(); 312 size = fileStatus.getLen(); 313 } else { 314 FileStatus fStatus = fs.getFileStatus(initialPath); 315 createdTimestamp = fStatus.getModificationTime(); 316 size = fStatus.getLen(); 317 } 318 } else { 319 throw new IOException("path=" + p + " doesn't look like a valid StoreFile"); 320 } 321 return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link, 322 isPrimaryReplica); 323 } 324 325 public String createHFileLink(final TableName linkedTable, final String linkedRegion, 326 final String hfileName, final boolean createBackRef) throws IOException { 327 String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName); 328 String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(), 329 ctx.getRegionInfo().getEncodedName()); 330 331 FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); 332 // Make sure the destination directory exists 333 fs.mkdirs(ctx.getFamilyStoreDirectoryPath()); 334 335 // Make sure the FileLink reference directory exists 336 Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion, 337 ctx.getFamily().getNameAsString()); 338 Path backRefPath = null; 339 if (createBackRef) { 340 Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName); 341 fs.mkdirs(backRefssDir); 342 343 // Create the reference for the link 344 backRefPath = new Path(backRefssDir, refName); 345 fs.createNewFile(backRefPath); 346 } 347 try { 348 // Create the link 349 if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) { 350 return name; 351 } 352 } catch (IOException e) { 353 LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(), 354 e); 355 // Revert the reference if the link creation failed 356 if (createBackRef) { 357 fs.delete(backRefPath, false); 358 } 359 throw e; 360 } 361 throw new IOException("File link=" + name + " already exists under " 362 + ctx.getFamilyStoreDirectoryPath() + " folder."); 363 364 } 365 366 public String createFromHFileLink(final String hfileLinkName, final boolean createBackRef) 367 throws IOException { 368 Matcher m = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName); 369 if (!m.matches()) { 370 throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!"); 371 } 372 return createHFileLink(TableName.valueOf(m.group(1), m.group(2)), m.group(3), m.group(4), 373 createBackRef); 374 } 375 376 /** 377 * For primary replica, we will call load once when opening a region, and the implementation could 378 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 379 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 380 * {@code true}. 381 */ 382 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 383 384 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 385 386 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 387 Collection<StoreFileInfo> newFiles) throws IOException; 388 389 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 390 391}