001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
022import org.apache.hadoop.fs.FSDataOutputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.fs.StreamCapabilities;
026import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
027import org.apache.hadoop.hbase.util.CommonFSUtils;
028import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
029import org.apache.hadoop.hdfs.DistributedFileSystem;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
033import org.apache.hbase.thirdparty.io.netty.channel.Channel;
034import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
035
036/**
037 * Helper class for creating AsyncFSOutput.
038 */
039@InterfaceAudience.Private
040public final class AsyncFSOutputHelper {
041
042  private AsyncFSOutputHelper() {
043  }
044
045  /**
046   * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple
047   * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
048   */
049  public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
050    boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
051    Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite)
052    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
053    if (fs instanceof DistributedFileSystem) {
054      return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
055        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor,
056        noLocalWrite);
057    }
058    final FSDataOutputStream out;
059    int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
060      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
061    // This is not a Distributed File System, so it won't be erasure coded; no builder API needed
062    if (createParent) {
063      out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
064    } else {
065      out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
066    }
067    // After we create the stream but before we attempt to use it at all
068    // ensure that we can provide the level of data safety we're configured
069    // to provide.
070    if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
071      if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
072        Closeables.close(out, true);
073        throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
074      }
075      if (!out.hasCapability(StreamCapabilities.HSYNC)) {
076        Closeables.close(out, true);
077        throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
078      }
079    }
080    return new WrapperAsyncFSOutput(f, out);
081  }
082}