001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
032
033import java.io.IOException;
034import java.io.InterruptedIOException;
035import java.lang.reflect.InvocationTargetException;
036import java.lang.reflect.Method;
037import java.util.ArrayList;
038import java.util.Collection;
039import java.util.EnumSet;
040import java.util.HashSet;
041import java.util.IdentityHashMap;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045import java.util.concurrent.TimeUnit;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.crypto.CryptoProtocolVersion;
049import org.apache.hadoop.crypto.Encryptor;
050import org.apache.hadoop.fs.CreateFlag;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.FileSystemLinkResolver;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.StorageType;
055import org.apache.hadoop.fs.UnresolvedLinkException;
056import org.apache.hadoop.fs.permission.FsPermission;
057import org.apache.hadoop.hbase.client.ConnectionUtils;
058import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
059import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
060import org.apache.hadoop.hbase.util.CancelableProgressable;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.DFSOutputStream;
063import org.apache.hadoop.hdfs.DistributedFileSystem;
064import org.apache.hadoop.hdfs.protocol.ClientProtocol;
065import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
066import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
067import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
068import org.apache.hadoop.hdfs.protocol.LocatedBlock;
069import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
071import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
072import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
074import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
082import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
083import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
084import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
085import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
086import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
087import org.apache.hadoop.io.EnumSetWritable;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.hadoop.net.NetUtils;
090import org.apache.hadoop.security.token.Token;
091import org.apache.hadoop.util.DataChecksum;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
097import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
099import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
100import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
101import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
102import org.apache.hbase.thirdparty.io.netty.channel.Channel;
103import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
104import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
105import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
106import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
107import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
108import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
109import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
110import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
111import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
112import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
113import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
114import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
115import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
116import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
117import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
118
119/**
120 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
121 */
122@InterfaceAudience.Private
123public final class FanOutOneBlockAsyncDFSOutputHelper {
124  private static final Logger LOG =
125    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
126
127  private FanOutOneBlockAsyncDFSOutputHelper() {
128  }
129
130  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
131
132  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
133  // use pooled allocator for performance.
134  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
135
136  // copied from DFSPacket since it is package private.
137  public static final long HEART_BEAT_SEQNO = -1L;
138
139  // Timeouts for communicating with DataNode for streaming writes/reads
140  public static final int READ_TIMEOUT = 60 * 1000;
141
142  private interface LeaseManager {
143
144    void begin(DFSClient client, HdfsFileStatus stat);
145
146    void end(DFSClient client, HdfsFileStatus stat);
147  }
148
149  private static final LeaseManager LEASE_MANAGER;
150
151  // helper class for creating files.
152  private interface FileCreator {
153    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
154      String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,
155      long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
156      try {
157        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
158          replication, blockSize, supportedVersions);
159      } catch (InvocationTargetException e) {
160        if (e.getCause() instanceof Exception) {
161          throw (Exception) e.getCause();
162        } else {
163          throw new RuntimeException(e.getCause());
164        }
165      }
166    }
167
168    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
169      EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
170      CryptoProtocolVersion[] supportedVersions) throws Exception;
171  }
172
173  private static final FileCreator FILE_CREATOR;
174
175  private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException {
176    Method beginFileLeaseMethod =
177      DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
178    beginFileLeaseMethod.setAccessible(true);
179    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
180    endFileLeaseMethod.setAccessible(true);
181    Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
182    getConfigurationMethod.setAccessible(true);
183    Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");
184
185    return new LeaseManager() {
186
187      private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
188        "dfs.client.output.stream.uniq.default.key";
189      private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";
190
191      private String getUniqId(DFSClient client, HdfsFileStatus stat)
192        throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
193        // Copied from DFSClient in Hadoop 3.4.0
194        long fileId = stat.getFileId();
195        String namespace = (String) getNamespaceMehtod.invoke(stat);
196        if (namespace == null) {
197          Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
198          String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
199            DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
200          return defaultKey + "_" + fileId;
201        } else {
202          return namespace + "_" + fileId;
203        }
204      }
205
206      @Override
207      public void begin(DFSClient client, HdfsFileStatus stat) {
208        try {
209          beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
210        } catch (IllegalAccessException | InvocationTargetException e) {
211          throw new RuntimeException(e);
212        }
213      }
214
215      @Override
216      public void end(DFSClient client, HdfsFileStatus stat) {
217        try {
218          endFileLeaseMethod.invoke(client, getUniqId(client, stat));
219        } catch (IllegalAccessException | InvocationTargetException e) {
220          throw new RuntimeException(e);
221        }
222      }
223    };
224  }
225
226  private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
227    Method beginFileLeaseMethod =
228      DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
229    beginFileLeaseMethod.setAccessible(true);
230    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
231    endFileLeaseMethod.setAccessible(true);
232    return new LeaseManager() {
233
234      @Override
235      public void begin(DFSClient client, HdfsFileStatus stat) {
236        try {
237          beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
238        } catch (IllegalAccessException | InvocationTargetException e) {
239          throw new RuntimeException(e);
240        }
241      }
242
243      @Override
244      public void end(DFSClient client, HdfsFileStatus stat) {
245        try {
246          endFileLeaseMethod.invoke(client, stat.getFileId());
247        } catch (IllegalAccessException | InvocationTargetException e) {
248          throw new RuntimeException(e);
249        }
250      }
251    };
252  }
253
254  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
255    try {
256      return createLeaseManager3_4();
257    } catch (NoSuchMethodException e) {
258      LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below");
259    }
260
261    return createLeaseManager3();
262  }
263
264  private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
265    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
266      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
267      CryptoProtocolVersion[].class, String.class, String.class);
268
269    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
270      supportedVersions) -> {
271      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
272        createParent, replication, blockSize, supportedVersions, null, null);
273    };
274  }
275
276  private static FileCreator createFileCreator3() throws NoSuchMethodException {
277    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
278      String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
279      CryptoProtocolVersion[].class, String.class);
280
281    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
282      supportedVersions) -> {
283      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
284        createParent, replication, blockSize, supportedVersions, null);
285    };
286  }
287
288  private static FileCreator createFileCreator() throws NoSuchMethodException {
289    try {
290      return createFileCreator3_3();
291    } catch (NoSuchMethodException e) {
292      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
293    }
294
295    return createFileCreator3();
296  }
297
298  // cancel the processing if DFSClient is already closed.
299  static final class CancelOnClose implements CancelableProgressable {
300
301    private final DFSClient client;
302
303    public CancelOnClose(DFSClient client) {
304      this.client = client;
305    }
306
307    @Override
308    public boolean progress() {
309      return client.isClientRunning();
310    }
311  }
312
313  static {
314    try {
315      LEASE_MANAGER = createLeaseManager();
316      FILE_CREATOR = createFileCreator();
317    } catch (Exception e) {
318      String msg = "Couldn't properly initialize access to HDFS internals. Please "
319        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
320        + "HBASE-16110 for more information.";
321      LOG.error(msg, e);
322      throw new Error(msg, e);
323    }
324  }
325
326  static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
327    LEASE_MANAGER.begin(client, stat);
328  }
329
330  static void endFileLease(DFSClient client, HdfsFileStatus stat) {
331    LEASE_MANAGER.end(client, stat);
332  }
333
334  static DataChecksum createChecksum(DFSClient client) {
335    return client.getConf().createChecksum(null);
336  }
337
338  static Status getStatus(PipelineAckProto ack) {
339    List<Integer> flagList = ack.getFlagList();
340    Integer headerFlag;
341    if (flagList.isEmpty()) {
342      Status reply = ack.getReply(0);
343      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
344    } else {
345      headerFlag = flagList.get(0);
346    }
347    return PipelineAck.getStatusFromHeader(headerFlag);
348  }
349
350  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
351    Promise<Channel> promise, int timeoutMs) {
352    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
353      new ProtobufVarint32FrameDecoder(),
354      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
355      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
356
357        @Override
358        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
359          throws Exception {
360          Status pipelineStatus = resp.getStatus();
361          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
362            throw new IOException("datanode " + dnInfo + " is restarting");
363          }
364          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
365          if (resp.getStatus() != Status.SUCCESS) {
366            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
367              throw new InvalidBlockTokenException("Got access token error" + ", status message "
368                + resp.getMessage() + ", " + logInfo);
369            } else {
370              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
371                + ", status message " + resp.getMessage() + ", " + logInfo);
372            }
373          }
374          // success
375          ChannelPipeline p = ctx.pipeline();
376          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
377            // do not remove all handlers because we may have wrap or unwrap handlers at the header
378            // of pipeline.
379            if (handler instanceof IdleStateHandler) {
380              break;
381            }
382          }
383          // Disable auto read here. Enable it after we setup the streaming pipeline in
384          // FanOutOneBLockAsyncDFSOutput.
385          ctx.channel().config().setAutoRead(false);
386          promise.trySuccess(ctx.channel());
387        }
388
389        @Override
390        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
391          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
392        }
393
394        @Override
395        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
396          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
397            promise
398              .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
399          } else {
400            super.userEventTriggered(ctx, evt);
401          }
402        }
403
404        @Override
405        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
406          promise.tryFailure(cause);
407        }
408      });
409  }
410
411  private static void requestWriteBlock(Channel channel, StorageType storageType,
412    OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
413    OpWriteBlockProto proto =
414      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
415    int protoLen = proto.getSerializedSize();
416    ByteBuf buffer =
417      channel.alloc().buffer(3 + CodedOutputStream.computeUInt32SizeNoTag(protoLen) + protoLen);
418    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
419    buffer.writeByte(Op.WRITE_BLOCK.code);
420    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
421    safeWriteAndFlush(channel, buffer);
422  }
423
424  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
425    StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
426    DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
427    throws IOException {
428    Promise<Void> saslPromise = channel.eventLoop().newPromise();
429    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
430    addListener(saslPromise, new FutureListener<Void>() {
431
432      @Override
433      public void operationComplete(Future<Void> future) throws Exception {
434        if (future.isSuccess()) {
435          // setup response processing pipeline first, then send request.
436          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
437          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
438        } else {
439          promise.tryFailure(future.cause());
440        }
441      }
442    });
443  }
444
445  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
446    String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
447    BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
448    Class<? extends Channel> channelClass) {
449    StorageType[] storageTypes = locatedBlock.getStorageTypes();
450    DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock);
451    boolean connectToDnViaHostname =
452      conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
453    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
454    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
455    blockCopy.setNumBytes(locatedBlock.getBlockSize());
456    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
457      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
458        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
459      .setClientName(clientName).build();
460    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
461    OpWriteBlockProto.Builder writeBlockProtoBuilder =
462      OpWriteBlockProto.newBuilder().setHeader(header)
463        .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1)
464        .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd)
465        .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto)
466        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
467    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
468    for (int i = 0; i < datanodeInfos.length; i++) {
469      DatanodeInfo dnInfo = datanodeInfos[i];
470      StorageType storageType = storageTypes[i];
471      Promise<Channel> promise = eventLoopGroup.next().newPromise();
472      futureList.add(promise);
473      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
474      addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
475        .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
476
477          @Override
478          protected void initChannel(Channel ch) throws Exception {
479            // we need to get the remote address of the channel so we can only move on after
480            // channel connected. Leave an empty implementation here because netty does not allow
481            // a null handler.
482          }
483        }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() {
484
485          @Override
486          public void operationComplete(ChannelFuture future) throws Exception {
487            if (future.isSuccess()) {
488              initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
489                timeoutMs, client, locatedBlock.getBlockToken(), promise);
490            } else {
491              promise.tryFailure(future.cause());
492            }
493          }
494        });
495    }
496    return futureList;
497  }
498
499  /**
500   * Exception other than RemoteException thrown when calling create on namenode
501   */
502  public static class NameNodeException extends IOException {
503
504    private static final long serialVersionUID = 3143237406477095390L;
505
506    public NameNodeException(Throwable cause) {
507      super(cause);
508    }
509  }
510
511  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
512    boolean noLocalWrite) {
513    List<CreateFlag> flags = new ArrayList<>();
514    flags.add(CreateFlag.CREATE);
515    if (overwrite) {
516      flags.add(CreateFlag.OVERWRITE);
517    }
518    if (noLocalWrite) {
519      flags.add(CreateFlag.NO_LOCAL_WRITE);
520    }
521    flags.add(CreateFlag.SHOULD_REPLICATE);
522    return new EnumSetWritable<>(EnumSet.copyOf(flags));
523  }
524
525  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
526    boolean overwrite, boolean createParent, short replication, long blockSize,
527    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
528    boolean noLocalWrite) throws IOException {
529    Configuration conf = dfs.getConf();
530    DFSClient client = dfs.getClient();
531    String clientName = client.getClientName();
532    ClientProtocol namenode = client.getNamenode();
533    int createMaxRetries =
534      conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
535    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
536    Set<DatanodeInfo> toExcludeNodes =
537      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
538    for (int retry = 0;; retry++) {
539      if (LOG.isDebugEnabled()) {
540        LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
541          getDataNodeInfo(toExcludeNodes), retry);
542      }
543      HdfsFileStatus stat;
544      try {
545        stat = FILE_CREATOR.create(namenode, src,
546          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
547          getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
548          CryptoProtocolVersion.supported());
549      } catch (Exception e) {
550        if (e instanceof RemoteException) {
551          throw (RemoteException) e;
552        } else {
553          throw new NameNodeException(e);
554        }
555      }
556      beginFileLease(client, stat);
557      boolean succ = false;
558      LocatedBlock locatedBlock = null;
559      List<Future<Channel>> futureList = null;
560      try {
561        DataChecksum summer = createChecksum(client);
562        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
563          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
564        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
565        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
566          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
567        for (int i = 0, n = futureList.size(); i < n; i++) {
568          DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i];
569          try {
570            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
571          } catch (Exception e) {
572            // exclude the broken DN next time
573            toExcludeNodes.add(datanodeInfo);
574            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
575            throw e;
576          }
577        }
578        Encryptor encryptor = createEncryptor(conf, stat, client);
579        FanOutOneBlockAsyncDFSOutput output =
580          new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
581            locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
582        succ = true;
583        return output;
584      } catch (RemoteException e) {
585        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
586        if (shouldRetryCreate(e)) {
587          if (retry >= createMaxRetries) {
588            throw e.unwrapRemoteException();
589          }
590        } else {
591          throw e.unwrapRemoteException();
592        }
593      } catch (IOException e) {
594        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
595        if (retry >= createMaxRetries) {
596          throw e;
597        }
598        // overwrite the old broken file.
599        overwrite = true;
600        try {
601          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
602        } catch (InterruptedException ie) {
603          throw new InterruptedIOException();
604        }
605      } finally {
606        if (!succ) {
607          if (futureList != null) {
608            for (Future<Channel> f : futureList) {
609              addListener(f, new FutureListener<Channel>() {
610
611                @Override
612                public void operationComplete(Future<Channel> future) throws Exception {
613                  if (future.isSuccess()) {
614                    safeClose(future.getNow());
615                  }
616                }
617              });
618            }
619          }
620          endFileLease(client, stat);
621        }
622      }
623    }
624  }
625
626  /**
627   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
628   * inside an {@link EventLoop}.
629   */
630  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
631    boolean overwrite, boolean createParent, short replication, long blockSize,
632    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
633    final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
634    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
635
636      @Override
637      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
638        throws IOException, UnresolvedLinkException {
639        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
640          blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
641      }
642
643      @Override
644      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
645        throw new UnsupportedOperationException();
646      }
647    }.resolve(dfs, f);
648  }
649
650  public static boolean shouldRetryCreate(RemoteException e) {
651    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
652    // For exceptions other than this, we just throw it out. This is same with
653    // DFSOutputStream.newStreamForCreate.
654    return e.getClassName().endsWith("RetryStartFileException");
655  }
656
657  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
658    ExtendedBlock block, HdfsFileStatus stat) {
659    for (int retry = 0;; retry++) {
660      try {
661        if (namenode.complete(src, clientName, block, stat.getFileId())) {
662          endFileLease(client, stat);
663          return;
664        } else {
665          LOG.warn("complete file " + src + " not finished, retry = " + retry);
666        }
667      } catch (RemoteException e) {
668        IOException ioe = e.unwrapRemoteException();
669        if (ioe instanceof LeaseExpiredException) {
670          LOG.warn("lease for file " + src + " is expired, give up", e);
671          return;
672        } else {
673          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
674        }
675      } catch (Exception e) {
676        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
677      }
678      sleepIgnoreInterrupt(retry);
679    }
680  }
681
682  static void sleepIgnoreInterrupt(int retry) {
683    try {
684      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
685    } catch (InterruptedException e) {
686    }
687  }
688
689  public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) {
690    if (datanodeInfos.isEmpty()) {
691      return "[]";
692    }
693    return datanodeInfos.stream()
694      .map(datanodeInfo -> new StringBuilder().append("(").append(datanodeInfo.getHostName())
695        .append("/").append(datanodeInfo.getInfoAddr()).append(":")
696        .append(datanodeInfo.getInfoPort()).append(")").toString())
697      .collect(Collectors.joining(",", "[", "]"));
698  }
699}