001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.storefiletracker; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021 022import java.io.IOException; 023import java.util.Collection; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 028import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 029import org.apache.hadoop.hbase.io.compress.Compression; 030import org.apache.hadoop.hbase.io.crypto.Encryption; 031import org.apache.hadoop.hbase.io.hfile.CacheConfig; 032import org.apache.hadoop.hbase.io.hfile.HFile; 033import org.apache.hadoop.hbase.io.hfile.HFileContext; 034import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 035import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 036import org.apache.hadoop.hbase.regionserver.StoreContext; 037import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 038import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 039import org.apache.hadoop.hbase.regionserver.StoreUtils; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Base class for all store file tracker. 047 * <p/> 048 * Mainly used to place the common logic to skip persistent for secondary replicas. 049 */ 050@InterfaceAudience.Private 051abstract class StoreFileTrackerBase implements StoreFileTracker { 052 053 private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); 054 055 protected final Configuration conf; 056 057 protected final boolean isPrimaryReplica; 058 059 protected final StoreContext ctx; 060 061 private volatile boolean cacheOnWriteLogged; 062 063 protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { 064 this.conf = conf; 065 this.isPrimaryReplica = isPrimaryReplica; 066 this.ctx = ctx; 067 } 068 069 @Override 070 public final List<StoreFileInfo> load() throws IOException { 071 return doLoadStoreFiles(!isPrimaryReplica); 072 } 073 074 @Override 075 public final void add(Collection<StoreFileInfo> newFiles) throws IOException { 076 if (isPrimaryReplica) { 077 doAddNewStoreFiles(newFiles); 078 } 079 } 080 081 @Override 082 public final void replace(Collection<StoreFileInfo> compactedFiles, 083 Collection<StoreFileInfo> newFiles) throws IOException { 084 if (isPrimaryReplica) { 085 doAddCompactionResults(compactedFiles, newFiles); 086 } 087 } 088 089 @Override 090 public final void set(List<StoreFileInfo> files) throws IOException { 091 if (isPrimaryReplica) { 092 doSetStoreFiles(files); 093 } 094 } 095 096 @Override 097 public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) { 098 builder.setValue(TRACKER_IMPL, getTrackerName()); 099 return builder; 100 } 101 102 protected final String getTrackerName() { 103 return StoreFileTrackerFactory.getStoreFileTrackerName(getClass()); 104 } 105 106 private HFileContext createFileContext(Compression.Algorithm compression, 107 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { 108 if (compression == null) { 109 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 110 } 111 ColumnFamilyDescriptor family = ctx.getFamily(); 112 HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) 113 .withIncludesTags(includesTag).withCompression(compression) 114 .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) 115 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) 116 .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) 117 .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) 118 .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) 119 .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()) 120 .withIndexBlockEncoding(family.getIndexBlockEncoding()).build(); 121 return hFileContext; 122 } 123 124 @Override 125 public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { 126 if (!isPrimaryReplica) { 127 throw new IllegalStateException("Should not call create writer on secondary replicas"); 128 } 129 // creating new cache config for each new writer 130 final CacheConfig cacheConf = ctx.getCacheConf(); 131 final CacheConfig writerCacheConf = new CacheConfig(cacheConf); 132 long totalCompactedFilesSize = params.totalCompactedFilesSize(); 133 if (params.isCompaction()) { 134 // Don't cache data on write on compactions, unless specifically configured to do so 135 // Cache only when total file size remains lower than configured threshold 136 final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); 137 // if data blocks are to be cached on write 138 // during compaction, we should forcefully 139 // cache index and bloom blocks as well 140 if ( 141 cacheCompactedBlocksOnWrite 142 && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold() 143 ) { 144 writerCacheConf.enableCacheOnWrite(); 145 if (!cacheOnWriteLogged) { 146 LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " 147 + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); 148 cacheOnWriteLogged = true; 149 } 150 } else { 151 writerCacheConf.setCacheDataOnWrite(false); 152 if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { 153 // checking condition once again for logging 154 LOG.debug( 155 "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " 156 + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", 157 this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); 158 } 159 } 160 } else { 161 final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); 162 if (shouldCacheDataOnWrite) { 163 writerCacheConf.enableCacheOnWrite(); 164 if (!cacheOnWriteLogged) { 165 LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " 166 + "Index blocks and Bloom filter blocks", this); 167 cacheOnWriteLogged = true; 168 } 169 } 170 } 171 Encryption.Context encryptionContext = ctx.getEncryptionContext(); 172 HFileContext hFileContext = createFileContext(params.compression(), 173 params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); 174 Path outputDir; 175 if (requireWritingToTmpDirFirst()) { 176 outputDir = 177 new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); 178 } else { 179 outputDir = ctx.getFamilyStoreDirectoryPath(); 180 } 181 StoreFileWriter.Builder builder = 182 new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) 183 .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) 184 .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) 185 .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) 186 .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) 187 .withFileStoragePolicy(params.fileStoragePolicy()) 188 .withWriterCreationTracker(params.writerCreationTracker()); 189 return builder.build(); 190 } 191 192 /** 193 * For primary replica, we will call load once when opening a region, and the implementation could 194 * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you 195 * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to 196 * {@code true}. 197 */ 198 protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException; 199 200 protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; 201 202 protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, 203 Collection<StoreFileInfo> newFiles) throws IOException; 204 205 protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException; 206 207}