001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
032
033import com.google.protobuf.CodedOutputStream;
034import java.io.IOException;
035import java.io.InterruptedIOException;
036import java.lang.reflect.InvocationTargetException;
037import java.lang.reflect.Method;
038import java.util.ArrayList;
039import java.util.EnumSet;
040import java.util.HashSet;
041import java.util.IdentityHashMap;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045import java.util.concurrent.TimeUnit;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.crypto.CryptoProtocolVersion;
048import org.apache.hadoop.crypto.Encryptor;
049import org.apache.hadoop.fs.CreateFlag;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.FileSystemLinkResolver;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.fs.StorageType;
054import org.apache.hadoop.fs.UnresolvedLinkException;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.hbase.client.ConnectionUtils;
057import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
058import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
059import org.apache.hadoop.hbase.util.CancelableProgressable;
060import org.apache.hadoop.hdfs.DFSClient;
061import org.apache.hadoop.hdfs.DFSOutputStream;
062import org.apache.hadoop.hdfs.DistributedFileSystem;
063import org.apache.hadoop.hdfs.protocol.ClientProtocol;
064import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
065import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
066import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
067import org.apache.hadoop.hdfs.protocol.LocatedBlock;
068import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
069import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
071import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
072import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
082import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
083import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
084import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
085import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
086import org.apache.hadoop.io.EnumSetWritable;
087import org.apache.hadoop.ipc.RemoteException;
088import org.apache.hadoop.net.NetUtils;
089import org.apache.hadoop.security.token.Token;
090import org.apache.hadoop.util.DataChecksum;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
096import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
097import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
099import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
100import org.apache.hbase.thirdparty.io.netty.channel.Channel;
101import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
102import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
103import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
104import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
105import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
106import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
107import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
108import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
109import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
110import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
111import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
112import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
113import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
114import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
115import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
116
117/**
118 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
119 */
120@InterfaceAudience.Private
121public final class FanOutOneBlockAsyncDFSOutputHelper {
122  private static final Logger LOG =
123    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
124
125  private FanOutOneBlockAsyncDFSOutputHelper() {
126  }
127
128  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
129
130  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
131  // use pooled allocator for performance.
132  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
133
134  // copied from DFSPacket since it is package private.
135  public static final long HEART_BEAT_SEQNO = -1L;
136
137  // Timeouts for communicating with DataNode for streaming writes/reads
138  public static final int READ_TIMEOUT = 60 * 1000;
139
140  private interface LeaseManager {
141
142    void begin(DFSClient client, long inodeId);
143
144    void end(DFSClient client, long inodeId);
145  }
146
147  private static final LeaseManager LEASE_MANAGER;
148
149  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
150  // isClientRunning is not public so we need to use reflection.
151  private interface DFSClientAdaptor {
152
153    boolean isClientRunning(DFSClient client);
154  }
155
156  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
157
158  // helper class for creating files.
159  private interface FileCreator {
160    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
161      String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,
162      long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
163      try {
164        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
165          replication, blockSize, supportedVersions);
166      } catch (InvocationTargetException e) {
167        if (e.getCause() instanceof Exception) {
168          throw (Exception) e.getCause();
169        } else {
170          throw new RuntimeException(e.getCause());
171        }
172      }
173    };
174
175    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
176      EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
177      CryptoProtocolVersion[] supportedVersions) throws Exception;
178  }
179
180  private static final FileCreator FILE_CREATOR;
181
182  // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
183  // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
184  // indirectly reference it through reflection.
185  private static final CreateFlag SHOULD_REPLICATE_FLAG;
186
187  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
188    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
189    isClientRunningMethod.setAccessible(true);
190    return new DFSClientAdaptor() {
191
192      @Override
193      public boolean isClientRunning(DFSClient client) {
194        try {
195          return (Boolean) isClientRunningMethod.invoke(client);
196        } catch (IllegalAccessException | InvocationTargetException e) {
197          throw new RuntimeException(e);
198        }
199      }
200    };
201  }
202
203  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
204    Method beginFileLeaseMethod =
205      DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
206    beginFileLeaseMethod.setAccessible(true);
207    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
208    endFileLeaseMethod.setAccessible(true);
209    return new LeaseManager() {
210
211      @Override
212      public void begin(DFSClient client, long inodeId) {
213        try {
214          beginFileLeaseMethod.invoke(client, inodeId, null);
215        } catch (IllegalAccessException | InvocationTargetException e) {
216          throw new RuntimeException(e);
217        }
218      }
219
220      @Override
221      public void end(DFSClient client, long inodeId) {
222        try {
223          endFileLeaseMethod.invoke(client, inodeId);
224        } catch (IllegalAccessException | InvocationTargetException e) {
225          throw new RuntimeException(e);
226        }
227      }
228    };
229  }
230
231  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
232    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
233      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
234      CryptoProtocolVersion[].class, String.class, String.class);
235
236    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
237      supportedVersions) -> {
238      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
239        createParent, replication, blockSize, supportedVersions, null, null);
240    };
241  }
242
243  private static FileCreator createFileCreator3() throws NoSuchMethodException {
244    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
245      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
246      CryptoProtocolVersion[].class, String.class);
247
248    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
249      supportedVersions) -> {
250      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
251        createParent, replication, blockSize, supportedVersions, null);
252    };
253  }
254
255  private static FileCreator createFileCreator2() throws NoSuchMethodException {
256    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
257      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
258      CryptoProtocolVersion[].class);
259
260    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
261      supportedVersions) -> {
262      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
263        createParent, replication, blockSize, supportedVersions);
264    };
265  }
266
267  private static FileCreator createFileCreator() throws NoSuchMethodException {
268    try {
269      return createFileCreator3_3();
270    } catch (NoSuchMethodException e) {
271      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
272    }
273
274    try {
275      return createFileCreator3();
276    } catch (NoSuchMethodException e) {
277      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
278    }
279    return createFileCreator2();
280  }
281
282  private static CreateFlag loadShouldReplicateFlag() {
283    try {
284      return CreateFlag.valueOf("SHOULD_REPLICATE");
285    } catch (IllegalArgumentException e) {
286      LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
287      return null;
288    }
289  }
290
291  // cancel the processing if DFSClient is already closed.
292  static final class CancelOnClose implements CancelableProgressable {
293
294    private final DFSClient client;
295
296    public CancelOnClose(DFSClient client) {
297      this.client = client;
298    }
299
300    @Override
301    public boolean progress() {
302      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
303    }
304  }
305
306  static {
307    try {
308      LEASE_MANAGER = createLeaseManager();
309      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
310      FILE_CREATOR = createFileCreator();
311      SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
312    } catch (Exception e) {
313      String msg = "Couldn't properly initialize access to HDFS internals. Please "
314        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
315        + "HBASE-16110 for more information.";
316      LOG.error(msg, e);
317      throw new Error(msg, e);
318    }
319  }
320
321  static void beginFileLease(DFSClient client, long inodeId) {
322    LEASE_MANAGER.begin(client, inodeId);
323  }
324
325  static void endFileLease(DFSClient client, long inodeId) {
326    LEASE_MANAGER.end(client, inodeId);
327  }
328
329  static DataChecksum createChecksum(DFSClient client) {
330    return client.getConf().createChecksum(null);
331  }
332
333  static Status getStatus(PipelineAckProto ack) {
334    List<Integer> flagList = ack.getFlagList();
335    Integer headerFlag;
336    if (flagList.isEmpty()) {
337      Status reply = ack.getReply(0);
338      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
339    } else {
340      headerFlag = flagList.get(0);
341    }
342    return PipelineAck.getStatusFromHeader(headerFlag);
343  }
344
345  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
346    Promise<Channel> promise, int timeoutMs) {
347    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
348      new ProtobufVarint32FrameDecoder(),
349      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
350      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
351
352        @Override
353        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
354          throws Exception {
355          Status pipelineStatus = resp.getStatus();
356          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
357            throw new IOException("datanode " + dnInfo + " is restarting");
358          }
359          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
360          if (resp.getStatus() != Status.SUCCESS) {
361            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
362              throw new InvalidBlockTokenException("Got access token error" + ", status message "
363                + resp.getMessage() + ", " + logInfo);
364            } else {
365              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
366                + ", status message " + resp.getMessage() + ", " + logInfo);
367            }
368          }
369          // success
370          ChannelPipeline p = ctx.pipeline();
371          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
372            // do not remove all handlers because we may have wrap or unwrap handlers at the header
373            // of pipeline.
374            if (handler instanceof IdleStateHandler) {
375              break;
376            }
377          }
378          // Disable auto read here. Enable it after we setup the streaming pipeline in
379          // FanOutOneBLockAsyncDFSOutput.
380          ctx.channel().config().setAutoRead(false);
381          promise.trySuccess(ctx.channel());
382        }
383
384        @Override
385        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
386          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
387        }
388
389        @Override
390        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
391          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
392            promise
393              .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
394          } else {
395            super.userEventTriggered(ctx, evt);
396          }
397        }
398
399        @Override
400        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
401          promise.tryFailure(cause);
402        }
403      });
404  }
405
406  private static void requestWriteBlock(Channel channel, StorageType storageType,
407    OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
408    OpWriteBlockProto proto =
409      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
410    int protoLen = proto.getSerializedSize();
411    ByteBuf buffer =
412      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
413    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
414    buffer.writeByte(Op.WRITE_BLOCK.code);
415    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
416    safeWriteAndFlush(channel, buffer);
417  }
418
419  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
420    StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
421    DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
422    throws IOException {
423    Promise<Void> saslPromise = channel.eventLoop().newPromise();
424    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
425    addListener(saslPromise, new FutureListener<Void>() {
426
427      @Override
428      public void operationComplete(Future<Void> future) throws Exception {
429        if (future.isSuccess()) {
430          // setup response processing pipeline first, then send request.
431          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
432          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
433        } else {
434          promise.tryFailure(future.cause());
435        }
436      }
437    });
438  }
439
440  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
441    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
442    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
443    Class<? extends Channel> channelClass) {
444    StorageType[] storageTypes = locatedBlock.getStorageTypes();
445    DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock);
446    boolean connectToDnViaHostname =
447      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
448    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
449    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
450    blockCopy.setNumBytes(locatedBlock.getBlockSize());
451    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
452      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
453        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
454      .setClientName(clientName).build();
455    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
456    OpWriteBlockProto.Builder writeBlockProtoBuilder =
457      OpWriteBlockProto.newBuilder().setHeader(header)
458        .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1)
459        .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd)
460        .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto)
461        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
462    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
463    for (int i = 0; i < datanodeInfos.length; i++) {
464      DatanodeInfo dnInfo = datanodeInfos[i];
465      StorageType storageType = storageTypes[i];
466      Promise<Channel> promise = eventLoopGroup.next().newPromise();
467      futureList.add(promise);
468      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
469      addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
470        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
471
472          @Override
473          protected void initChannel(Channel ch) throws Exception {
474            // we need to get the remote address of the channel so we can only move on after
475            // channel connected. Leave an empty implementation here because netty does not allow
476            // a null handler.
477          }
478        }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
479
480          @Override
481          public void operationComplete(ChannelFuture future) throws Exception {
482            if (future.isSuccess()) {
483              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
484                timeoutMs, client, locatedBlock.getBlockToken(), promise);
485            } else {
486              promise.tryFailure(future.cause());
487            }
488          }
489        });
490    }
491    return futureList;
492  }
493
494  /**
495   * Exception other than RemoteException thrown when calling create on namenode
496   */
497  public static class NameNodeException extends IOException {
498
499    private static final long serialVersionUID = 3143237406477095390L;
500
501    public NameNodeException(Throwable cause) {
502      super(cause);
503    }
504  }
505
506  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
507    boolean noLocalWrite) {
508    List<CreateFlag> flags = new ArrayList<>();
509    flags.add(CreateFlag.CREATE);
510    if (overwrite) {
511      flags.add(CreateFlag.OVERWRITE);
512    }
513    if (SHOULD_REPLICATE_FLAG != null) {
514      flags.add(SHOULD_REPLICATE_FLAG);
515    }
516    if (noLocalWrite) {
517      flags.add(CreateFlag.NO_LOCAL_WRITE);
518    }
519    return new EnumSetWritable<>(EnumSet.copyOf(flags));
520  }
521
522  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
523    boolean overwrite, boolean createParent, short replication, long blockSize,
524    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
525    boolean noLocalWrite) throws IOException {
526    Configuration conf = dfs.getConf();
527    DFSClient client = dfs.getClient();
528    String clientName = client.getClientName();
529    ClientProtocol namenode = client.getNamenode();
530    int createMaxRetries =
531      conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
532    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
533    Set<DatanodeInfo> toExcludeNodes =
534      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
535    for (int retry = 0;; retry++) {
536      LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
537        toExcludeNodes, retry);
538      HdfsFileStatus stat;
539      try {
540        stat = FILE_CREATOR.create(namenode, src,
541          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
542          getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
543          CryptoProtocolVersion.supported());
544      } catch (Exception e) {
545        if (e instanceof RemoteException) {
546          throw (RemoteException) e;
547        } else {
548          throw new NameNodeException(e);
549        }
550      }
551      beginFileLease(client, stat.getFileId());
552      boolean succ = false;
553      LocatedBlock locatedBlock = null;
554      List<Future<Channel>> futureList = null;
555      try {
556        DataChecksum summer = createChecksum(client);
557        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
558          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
559        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
560        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
561          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
562        for (int i = 0, n = futureList.size(); i < n; i++) {
563          DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i];
564          try {
565            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
566          } catch (Exception e) {
567            // exclude the broken DN next time
568            toExcludeNodes.add(datanodeInfo);
569            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
570            throw e;
571          }
572        }
573        Encryptor encryptor = createEncryptor(conf, stat, client);
574        FanOutOneBlockAsyncDFSOutput output =
575          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
576            stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
577        succ = true;
578        return output;
579      } catch (RemoteException e) {
580        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
581        if (shouldRetryCreate(e)) {
582          if (retry >= createMaxRetries) {
583            throw e.unwrapRemoteException();
584          }
585        } else {
586          throw e.unwrapRemoteException();
587        }
588      } catch (IOException e) {
589        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
590        if (retry >= createMaxRetries) {
591          throw e;
592        }
593        // overwrite the old broken file.
594        overwrite = true;
595        try {
596          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
597        } catch (InterruptedException ie) {
598          throw new InterruptedIOException();
599        }
600      } finally {
601        if (!succ) {
602          if (futureList != null) {
603            for (Future<Channel> f : futureList) {
604              addListener(f, new FutureListener<Channel>() {
605
606                @Override
607                public void operationComplete(Future<Channel> future) throws Exception {
608                  if (future.isSuccess()) {
609                    safeClose(future.getNow());
610                  }
611                }
612              });
613            }
614          }
615          endFileLease(client, stat.getFileId());
616        }
617      }
618    }
619  }
620
621  /**
622   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
623   * inside an {@link EventLoop}.
624   */
625  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
626    boolean overwrite, boolean createParent, short replication, long blockSize,
627    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
628    final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
629    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
630
631      @Override
632      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
633        throws IOException, UnresolvedLinkException {
634        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
635          blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
636      }
637
638      @Override
639      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
640        throw new UnsupportedOperationException();
641      }
642    }.resolve(dfs, f);
643  }
644
645  public static boolean shouldRetryCreate(RemoteException e) {
646    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
647    // For exceptions other than this, we just throw it out. This is same with
648    // DFSOutputStream.newStreamForCreate.
649    return e.getClassName().endsWith("RetryStartFileException");
650  }
651
652  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
653    ExtendedBlock block, long fileId) {
654    for (int retry = 0;; retry++) {
655      try {
656        if (namenode.complete(src, clientName, block, fileId)) {
657          endFileLease(client, fileId);
658          return;
659        } else {
660          LOG.warn("complete file " + src + " not finished, retry = " + retry);
661        }
662      } catch (RemoteException e) {
663        IOException ioe = e.unwrapRemoteException();
664        if (ioe instanceof LeaseExpiredException) {
665          LOG.warn("lease for file " + src + " is expired, give up", e);
666          return;
667        } else {
668          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
669        }
670      } catch (Exception e) {
671        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
672      }
673      sleepIgnoreInterrupt(retry);
674    }
675  }
676
677  static void sleepIgnoreInterrupt(int retry) {
678    try {
679      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
680    } catch (InterruptedException e) {
681    }
682  }
683}