001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.BufferedInputStream;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Collection;
027import java.util.List;
028import java.util.regex.Matcher;
029import org.apache.commons.io.IOUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.io.HFileLink;
039import org.apache.hadoop.hbase.io.Reference;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.crypto.Encryption;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.io.hfile.HFileContext;
045import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
046import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
047import org.apache.hadoop.hbase.regionserver.StoreContext;
048import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
049import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
050import org.apache.hadoop.hbase.regionserver.StoreUtils;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.HFileArchiveUtil;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059
060/**
061 * Base class for all store file tracker.
062 * <p/>
063 * Mainly used to place the common logic to skip persistent for secondary replicas.
064 */
065@InterfaceAudience.Private
066abstract class StoreFileTrackerBase implements StoreFileTracker {
067
068  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
069
070  protected final Configuration conf;
071
072  protected final boolean isPrimaryReplica;
073
074  protected final StoreContext ctx;
075
076  private volatile boolean cacheOnWriteLogged;
077
078  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
079    this.conf = conf;
080    this.isPrimaryReplica = isPrimaryReplica;
081    this.ctx = ctx;
082  }
083
084  @Override
085  public final List<StoreFileInfo> load() throws IOException {
086    return doLoadStoreFiles(!isPrimaryReplica);
087  }
088
089  @Override
090  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
091    if (isPrimaryReplica) {
092      doAddNewStoreFiles(newFiles);
093    }
094  }
095
096  @Override
097  public final void replace(Collection<StoreFileInfo> compactedFiles,
098    Collection<StoreFileInfo> newFiles) throws IOException {
099    if (isPrimaryReplica) {
100      doAddCompactionResults(compactedFiles, newFiles);
101    }
102  }
103
104  @Override
105  public final void set(List<StoreFileInfo> files) throws IOException {
106    if (isPrimaryReplica) {
107      doSetStoreFiles(files);
108    }
109  }
110
111  @Override
112  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
113    builder.setValue(TRACKER_IMPL, getTrackerName());
114    return builder;
115  }
116
117  protected final String getTrackerName() {
118    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
119  }
120
121  private HFileContext createFileContext(Compression.Algorithm compression,
122    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
123    if (compression == null) {
124      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
125    }
126    ColumnFamilyDescriptor family = ctx.getFamily();
127    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
128      .withIncludesTags(includesTag).withCompression(compression)
129      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
130      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
131      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
132      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
133      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
134      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
135      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
136    return hFileContext;
137  }
138
139  @Override
140  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
141    if (!isPrimaryReplica) {
142      throw new IllegalStateException("Should not call create writer on secondary replicas");
143    }
144    // creating new cache config for each new writer
145    final CacheConfig cacheConf = ctx.getCacheConf();
146    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
147    long totalCompactedFilesSize = params.totalCompactedFilesSize();
148    if (params.isCompaction()) {
149      // Don't cache data on write on compactions, unless specifically configured to do so
150      // Cache only when total file size remains lower than configured threshold
151      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
152      // if data blocks are to be cached on write
153      // during compaction, we should forcefully
154      // cache index and bloom blocks as well
155      if (
156        cacheCompactedBlocksOnWrite
157          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
158      ) {
159        writerCacheConf.enableCacheOnWrite();
160        if (!cacheOnWriteLogged) {
161          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
162            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
163          cacheOnWriteLogged = true;
164        }
165      } else {
166        writerCacheConf.setCacheDataOnWrite(false);
167        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
168          // checking condition once again for logging
169          LOG.debug(
170            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
171              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
172            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
173        }
174      }
175    } else {
176      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
177      if (shouldCacheDataOnWrite) {
178        writerCacheConf.enableCacheOnWrite();
179        if (!cacheOnWriteLogged) {
180          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
181            + "Index blocks and Bloom filter blocks", this);
182          cacheOnWriteLogged = true;
183        }
184      }
185    }
186    Encryption.Context encryptionContext = ctx.getEncryptionContext();
187    HFileContext hFileContext = createFileContext(params.compression(),
188      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
189    Path outputDir;
190    if (requireWritingToTmpDirFirst()) {
191      outputDir =
192        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
193    } else {
194      outputDir = ctx.getFamilyStoreDirectoryPath();
195    }
196    StoreFileWriter.Builder builder =
197      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
198        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
199        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
200        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
201        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
202        .withFileStoragePolicy(params.fileStoragePolicy())
203        .withWriterCreationTracker(params.writerCreationTracker())
204        .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior())
205        .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction());
206    return builder.build();
207  }
208
209  @Override
210  public Reference createReference(Reference reference, Path path) throws IOException {
211    FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false);
212    try {
213      out.write(reference.toByteArray());
214    } finally {
215      out.close();
216    }
217    return reference;
218  }
219
220  /**
221   * Returns true if the specified family has reference files
222   * @param familyName Column Family Name
223   * @return true if family contains reference files
224   */
225  public boolean hasReferences() throws IOException {
226    Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString());
227    FileStatus[] files =
228      CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir);
229    if (files != null) {
230      for (FileStatus stat : files) {
231        if (stat.isDirectory()) {
232          continue;
233        }
234        if (StoreFileInfo.isReference(stat.getPath())) {
235          LOG.trace("Reference {}", stat.getPath());
236          return true;
237        }
238      }
239    }
240    return false;
241  }
242
243  @Override
244  public Reference readReference(final Path p) throws IOException {
245    InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p);
246    try {
247      // I need to be able to move back in the stream if this is not a pb serialization so I can
248      // do the Writable decoding instead.
249      in = in.markSupported() ? in : new BufferedInputStream(in);
250      int pblen = ProtobufUtil.lengthOfPBMagic();
251      in.mark(pblen);
252      byte[] pbuf = new byte[pblen];
253      IOUtils.readFully(in, pbuf, 0, pblen);
254      // WATCHOUT! Return in middle of function!!!
255      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
256        return Reference.convert(
257          org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in));
258      }
259      // Else presume Writables. Need to reset the stream since it didn't start w/ pb.
260      // We won't bother rewriting thie Reference as a pb since Reference is transitory.
261      in.reset();
262      Reference r = new Reference();
263      DataInputStream dis = new DataInputStream(in);
264      // Set in = dis so it gets the close below in the finally on our way out.
265      in = dis;
266      r.readFields(dis);
267      return r;
268    } finally {
269      in.close();
270    }
271  }
272
273  @Override
274  public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica)
275    throws IOException {
276    return getStoreFileInfo(null, initialPath, primaryReplica);
277  }
278
279  @Override
280  public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath,
281    boolean primaryReplica) throws IOException {
282    FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem();
283    assert fs != null;
284    assert initialPath != null;
285    assert conf != null;
286    Reference reference = null;
287    HFileLink link = null;
288    long createdTimestamp = 0;
289    long size = 0;
290    Path p = initialPath;
291    if (HFileLink.isHFileLink(p)) {
292      // HFileLink
293      reference = null;
294      link = HFileLink.buildFromHFileLinkPattern(conf, p);
295      LOG.trace("{} is a link", p);
296    } else if (StoreFileInfo.isReference(p)) {
297      reference = readReference(p);
298      Path referencePath = StoreFileInfo.getReferredToFile(p);
299      if (HFileLink.isHFileLink(referencePath)) {
300        // HFileLink Reference
301        link = HFileLink.buildFromHFileLinkPattern(conf, referencePath);
302      } else {
303        // Reference
304        link = null;
305      }
306      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
307    } else
308      if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) {
309        // HFile
310        if (fileStatus != null) {
311          createdTimestamp = fileStatus.getModificationTime();
312          size = fileStatus.getLen();
313        } else {
314          FileStatus fStatus = fs.getFileStatus(initialPath);
315          createdTimestamp = fStatus.getModificationTime();
316          size = fStatus.getLen();
317        }
318      } else {
319        throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
320      }
321    return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link,
322      isPrimaryReplica);
323  }
324
325  public String createHFileLink(final TableName linkedTable, final String linkedRegion,
326    final String hfileName, final boolean createBackRef) throws IOException {
327    String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName);
328    String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(),
329      ctx.getRegionInfo().getEncodedName());
330
331    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
332    // Make sure the destination directory exists
333    fs.mkdirs(ctx.getFamilyStoreDirectoryPath());
334
335    // Make sure the FileLink reference directory exists
336    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion,
337      ctx.getFamily().getNameAsString());
338    Path backRefPath = null;
339    if (createBackRef) {
340      Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
341      fs.mkdirs(backRefssDir);
342
343      // Create the reference for the link
344      backRefPath = new Path(backRefssDir, refName);
345      fs.createNewFile(backRefPath);
346    }
347    try {
348      // Create the link
349      if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) {
350        return name;
351      }
352    } catch (IOException e) {
353      LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(),
354        e);
355      // Revert the reference if the link creation failed
356      if (createBackRef) {
357        fs.delete(backRefPath, false);
358      }
359      throw e;
360    }
361    throw new IOException("File link=" + name + " already exists under "
362      + ctx.getFamilyStoreDirectoryPath() + " folder.");
363
364  }
365
366  public String createFromHFileLink(final String hfileLinkName, final boolean createBackRef)
367    throws IOException {
368    Matcher m = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName);
369    if (!m.matches()) {
370      throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!");
371    }
372    return createHFileLink(TableName.valueOf(m.group(1), m.group(2)), m.group(3), m.group(4),
373      createBackRef);
374  }
375
376  /**
377   * For primary replica, we will call load once when opening a region, and the implementation could
378   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
379   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
380   * {@code true}.
381   */
382  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
383
384  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
385
386  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
387    Collection<StoreFileInfo> newFiles) throws IOException;
388
389  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
390
391}