001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 022import org.apache.hadoop.fs.FSDataOutputStream; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.fs.StreamCapabilities; 026import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 027import org.apache.hadoop.hbase.util.CommonFSUtils; 028import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 029import org.apache.hadoop.hdfs.DistributedFileSystem; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 033import org.apache.hbase.thirdparty.io.netty.channel.Channel; 034import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 035 036/** 037 * Helper class for creating AsyncFSOutput. 038 */ 039@InterfaceAudience.Private 040public final class AsyncFSOutputHelper { 041 042 private AsyncFSOutputHelper() { 043 } 044 045 /** 046 * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple 047 * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. 048 */ 049 public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, 050 boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, 051 Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite) 052 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 053 if (fs instanceof DistributedFileSystem) { 054 return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, 055 overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, 056 noLocalWrite); 057 } 058 final FSDataOutputStream out; 059 int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 060 CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); 061 // This is not a Distributed File System, so it won't be erasure coded; no builder API needed 062 if (createParent) { 063 out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); 064 } else { 065 out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); 066 } 067 // After we create the stream but before we attempt to use it at all 068 // ensure that we can provide the level of data safety we're configured 069 // to provide. 070 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { 071 if (!out.hasCapability(StreamCapabilities.HFLUSH)) { 072 Closeables.close(out, true); 073 throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH); 074 } 075 if (!out.hasCapability(StreamCapabilities.HSYNC)) { 076 Closeables.close(out, true); 077 throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC); 078 } 079 } 080 return new WrapperAsyncFSOutput(f, out); 081 } 082}