001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
032
033import com.google.errorprone.annotations.RestrictedApi;
034import java.io.IOException;
035import java.nio.ByteBuffer;
036import java.util.ArrayList;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Map;
042import java.util.Set;
043import java.util.concurrent.CompletableFuture;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ConcurrentLinkedDeque;
046import java.util.concurrent.TimeUnit;
047import java.util.function.Supplier;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.crypto.Encryptor;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
052import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
053import org.apache.hadoop.hbase.util.CancelableProgressable;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.apache.hadoop.hbase.util.NettyFutureUtils;
057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
058import org.apache.hadoop.hdfs.DFSClient;
059import org.apache.hadoop.hdfs.DistributedFileSystem;
060import org.apache.hadoop.hdfs.protocol.ClientProtocol;
061import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
062import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
064import org.apache.hadoop.hdfs.protocol.LocatedBlock;
065import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
066import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
069import org.apache.hadoop.util.DataChecksum;
070import org.apache.yetus.audience.InterfaceAudience;
071
072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
074import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
075import org.apache.hbase.thirdparty.io.netty.channel.Channel;
076import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
077import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
078import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
079import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
080import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
081import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
082import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
083import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
084
085/**
086 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
087 * supports writing file with only one block.
088 * <p>
089 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main
090 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
091 * method. And we place it here under io package because we want to make it independent of WAL
092 * implementation thus easier to move it to HDFS project finally.
093 * <p>
094 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
095 * previous flush succeeds, the implementation is not thread safe, so you should not call its
096 * methods concurrently.
097 * <p>
098 * Advantages compare to DFSOutputStream:
099 * <ol>
100 * <li>The fan out mechanism. This will reduce the latency.</li>
101 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
102 * ASAP.</li>
103 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
104 * </ol>
105 */
106@InterfaceAudience.Private
107public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
108
109  // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set
110  // a smaller limit for data size.
111  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
112
113  private final Configuration conf;
114
115  private final DistributedFileSystem dfs;
116
117  private final DFSClient client;
118
119  private final ClientProtocol namenode;
120
121  private final String clientName;
122
123  private final String src;
124
125  private HdfsFileStatus stat;
126
127  private final ExtendedBlock block;
128
129  private final DatanodeInfo[] locations;
130
131  private final Encryptor encryptor;
132
133  private final Map<Channel, DatanodeInfo> datanodeInfoMap;
134
135  private final DataChecksum summer;
136
137  private final int maxDataLen;
138
139  private final ByteBufAllocator alloc;
140
141  private static final class Callback {
142
143    private final CompletableFuture<Long> future;
144
145    private final long ackedLength;
146
147    // should be backed by a thread safe collection
148    private final Set<ChannelId> unfinishedReplicas;
149    private final long packetDataLen;
150    private final long flushTimestamp;
151    private long lastAckTimestamp = -1;
152
153    public Callback(CompletableFuture<Long> future, long ackedLength,
154      final Collection<Channel> replicas, long packetDataLen) {
155      this.future = future;
156      this.ackedLength = ackedLength;
157      this.packetDataLen = packetDataLen;
158      this.flushTimestamp = EnvironmentEdgeManager.currentTime();
159      if (replicas.isEmpty()) {
160        this.unfinishedReplicas = Collections.emptySet();
161      } else {
162        this.unfinishedReplicas =
163          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
164        replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
165      }
166    }
167  }
168
169  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
170
171  private volatile long ackedBlockLength = 0L;
172
173  // this could be different from acked block length because a packet can not start at the middle of
174  // a chunk.
175  private long nextPacketOffsetInBlock = 0L;
176
177  // the length of the trailing partial chunk, this is because the packet start offset must be
178  // aligned with the length of checksum chunk, so we need to resend the same data.
179  private int trailingPartialChunkLength = 0;
180
181  private long nextPacketSeqno = 0L;
182
183  private ByteBuf buf;
184
185  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
186
187  // State for connections to DN
188  private enum State {
189    STREAMING,
190    CLOSING,
191    BROKEN,
192    CLOSED
193  }
194
195  private volatile State state;
196
197  private final StreamSlowMonitor streamSlowMonitor;
198
199  // all lock-free to make it run faster
200  private void completed(Channel channel) {
201    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
202      Callback c = iter.next();
203      // if the current unfinished replicas does not contain us then it means that we have already
204      // acked this one, let's iterate to find the one we have not acked yet.
205      if (c.unfinishedReplicas.remove(channel.id())) {
206        long current = EnvironmentEdgeManager.currentTime();
207        streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
208          current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
209        c.lastAckTimestamp = current;
210        if (c.unfinishedReplicas.isEmpty()) {
211          // we need to remove first before complete the future. It is possible that after we
212          // complete the future the upper layer will call close immediately before we remove the
213          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
214          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
215          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
216          // removing the entry returned by calling previous next, so if the entry has already been
217          // removed then it is a no-op, and for the assign, the values are the same so no problem.
218          iter.remove();
219          ackedBlockLength = c.ackedLength;
220          // the future.complete check is to confirm that we are the only one who grabbed the work,
221          // otherwise just give up and return.
222          if (c.future.complete(c.ackedLength)) {
223            // also wake up flush requests which have the same length.
224            while (iter.hasNext()) {
225              Callback maybeDummyCb = iter.next();
226              if (maybeDummyCb.ackedLength == c.ackedLength) {
227                iter.remove();
228                maybeDummyCb.future.complete(c.ackedLength);
229              } else {
230                break;
231              }
232            }
233          }
234        }
235        return;
236      }
237    }
238  }
239
240  // this usually does not happen which means it is not on the critical path so make it synchronized
241  // so that the implementation will not burn up our brain as there are multiple state changes and
242  // checks.
243  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
244    if (state == State.CLOSED) {
245      return;
246    }
247    if (state == State.BROKEN) {
248      failWaitingAckQueue(channel, errorSupplier);
249      return;
250    }
251    if (state == State.CLOSING) {
252      Callback c = waitingAckQueue.peekFirst();
253      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
254        // nothing, the endBlock request has already finished.
255        return;
256      }
257    }
258    // disable further write, and fail all pending ack.
259    state = State.BROKEN;
260    failWaitingAckQueue(channel, errorSupplier);
261    datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
262  }
263
264  private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
265    Throwable error = errorSupplier.get();
266    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
267      Callback c = iter.next();
268      // find the first sync request which we have not acked yet and fail all the request after it.
269      if (!c.unfinishedReplicas.contains(channel.id())) {
270        continue;
271      }
272      for (;;) {
273        c.future.completeExceptionally(error);
274        if (!iter.hasNext()) {
275          break;
276        }
277        c = iter.next();
278      }
279      break;
280    }
281  }
282
283  @Sharable
284  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
285
286    private final int timeoutMs;
287
288    public AckHandler(int timeoutMs) {
289      this.timeoutMs = timeoutMs;
290    }
291
292    @Override
293    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
294      Status reply = getStatus(ack);
295      if (reply != Status.SUCCESS) {
296        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block
297          + " from datanode " + ctx.channel().remoteAddress()));
298        return;
299      }
300      if (PipelineAck.isRestartOOBStatus(reply)) {
301        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
302          + block + " from datanode " + ctx.channel().remoteAddress()));
303        return;
304      }
305      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
306        return;
307      }
308      completed(ctx.channel());
309    }
310
311    @Override
312    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
313      if (state == State.CLOSED) {
314        return;
315      }
316      failed(ctx.channel(),
317        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
318    }
319
320    @Override
321    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
322      failed(ctx.channel(), () -> cause);
323    }
324
325    @Override
326    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
327      if (evt instanceof IdleStateEvent) {
328        IdleStateEvent e = (IdleStateEvent) evt;
329        if (e.state() == READER_IDLE) {
330          failed(ctx.channel(),
331            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
332        } else if (e.state() == WRITER_IDLE) {
333          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
334          int len = heartbeat.getSerializedSize();
335          ByteBuf buf = alloc.buffer(len);
336          heartbeat.putInBuffer(buf.nioBuffer(0, len));
337          buf.writerIndex(len);
338          safeWriteAndFlush(ctx.channel(), buf);
339        }
340        return;
341      }
342      super.userEventTriggered(ctx, evt);
343    }
344  }
345
346  private void setupReceiver(int timeoutMs) {
347    AckHandler ackHandler = new AckHandler(timeoutMs);
348    for (Channel ch : datanodeInfoMap.keySet()) {
349      ch.pipeline().addLast(
350        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
351        new ProtobufVarint32FrameDecoder(),
352        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
353      ch.config().setAutoRead(true);
354    }
355  }
356
357  FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
358    ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
359    LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
360    DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
361    this.conf = conf;
362    this.dfs = dfs;
363    this.client = client;
364    this.namenode = namenode;
365    this.stat = stat;
366    this.clientName = clientName;
367    this.src = src;
368    this.block = locatedBlock.getBlock();
369    this.locations = getLocatedBlockLocations(locatedBlock);
370    this.encryptor = encryptor;
371    this.datanodeInfoMap = datanodeInfoMap;
372    this.summer = summer;
373    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
374    this.alloc = alloc;
375    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
376    this.state = State.STREAMING;
377    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
378    this.streamSlowMonitor = streamSlowMonitor;
379  }
380
381  @Override
382  public void writeInt(int i) {
383    buf.ensureWritable(4);
384    buf.writeInt(i);
385  }
386
387  @Override
388  public void write(ByteBuffer bb) {
389    buf.ensureWritable(bb.remaining());
390    buf.writeBytes(bb);
391  }
392
393  @Override
394  public void write(byte[] b) {
395    write(b, 0, b.length);
396  }
397
398  @Override
399  public void write(byte[] b, int off, int len) {
400    buf.ensureWritable(len);
401    buf.writeBytes(b, off, len);
402  }
403
404  @Override
405  public int buffered() {
406    return buf.readableBytes();
407  }
408
409  @Override
410  public DatanodeInfo[] getPipeline() {
411    return locations;
412  }
413
414  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
415    long nextPacketOffsetInBlock, boolean syncBlock) {
416    int dataLen = dataBuf.readableBytes();
417    int chunkLen = summer.getBytesPerChecksum();
418    int trailingPartialChunkLen = dataLen % chunkLen;
419    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
420    int checksumLen = numChecks * summer.getChecksumSize();
421    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
422    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
423    checksumBuf.writerIndex(checksumLen);
424    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
425      nextPacketSeqno, false, dataLen, syncBlock);
426    int headerLen = header.getSerializedSize();
427    ByteBuf headerBuf = alloc.buffer(headerLen);
428    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
429    headerBuf.writerIndex(headerLen);
430    Callback c =
431      new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen);
432    waitingAckQueue.addLast(c);
433    // recheck again after we pushed the callback to queue
434    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
435      future.completeExceptionally(new IOException("stream already broken"));
436      // it's the one we have just pushed or just a no-op
437      waitingAckQueue.removeFirst();
438
439      checksumBuf.release();
440      headerBuf.release();
441
442      // This method takes ownership of the dataBuf, so we need release it before returning.
443      dataBuf.release();
444      return;
445    }
446    // TODO: we should perhaps measure time taken per DN here;
447    // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
448    datanodeInfoMap.keySet().forEach(ch -> {
449      safeWrite(ch, headerBuf.retainedDuplicate());
450      safeWrite(ch, checksumBuf.retainedDuplicate());
451      safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
452    });
453    checksumBuf.release();
454    headerBuf.release();
455    dataBuf.release();
456    nextPacketSeqno++;
457  }
458
459  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
460    if (state != State.STREAMING) {
461      future.completeExceptionally(new IOException("stream already broken"));
462      return;
463    }
464    int dataLen = buf.readableBytes();
465    if (dataLen == trailingPartialChunkLength) {
466      // no new data
467      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
468      Callback lastFlush = waitingAckQueue.peekLast();
469      if (lastFlush != null) {
470        Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
471        waitingAckQueue.addLast(c);
472        // recheck here if we have already removed the previous callback from the queue
473        if (waitingAckQueue.peekFirst() == c) {
474          // all previous callbacks have been removed
475          // notice that this does mean we will always win here because the background thread may
476          // have already started to mark the future here as completed in the completed or failed
477          // methods but haven't removed it from the queue yet. That's also why the removeFirst
478          // call below may be a no-op.
479          if (state != State.STREAMING) {
480            future.completeExceptionally(new IOException("stream already broken"));
481          } else {
482            future.complete(lengthAfterFlush);
483          }
484          // it's the one we have just pushed or just a no-op
485          waitingAckQueue.removeFirst();
486        }
487      } else {
488        // we must have acked all the data so the ackedBlockLength must be same with
489        // lengthAfterFlush
490        future.complete(lengthAfterFlush);
491      }
492      return;
493    }
494
495    if (encryptor != null) {
496      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
497      buf.readBytes(encryptBuf, trailingPartialChunkLength);
498      int toEncryptLength = dataLen - trailingPartialChunkLength;
499      try {
500        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
501          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
502      } catch (IOException e) {
503        encryptBuf.release();
504        future.completeExceptionally(e);
505        return;
506      }
507      encryptBuf.writerIndex(dataLen);
508      buf.release();
509      buf = encryptBuf;
510    }
511
512    if (dataLen > maxDataLen) {
513      // We need to write out the data by multiple packets as the max packet allowed is 16M.
514      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
515      for (int remaining = dataLen;;) {
516        if (remaining < maxDataLen) {
517          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
518            syncBlock);
519          break;
520        } else {
521          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
522            nextSubPacketOffsetInBlock, syncBlock);
523          remaining -= maxDataLen;
524          nextSubPacketOffsetInBlock += maxDataLen;
525        }
526      }
527    } else {
528      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
529    }
530    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
531    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
532      .ensureWritable(trailingPartialChunkLength);
533    if (trailingPartialChunkLength != 0) {
534      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
535        trailingPartialChunkLength);
536    }
537    buf.release();
538    this.buf = newBuf;
539    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
540  }
541
542  /**
543   * Flush the buffer out to datanodes.
544   * @param syncBlock will call hsync if true, otherwise hflush.
545   * @return A CompletableFuture that hold the acked length after flushing.
546   */
547  @Override
548  public CompletableFuture<Long> flush(boolean syncBlock) {
549    CompletableFuture<Long> future = new CompletableFuture<>();
550    flush0(future, syncBlock);
551    return future;
552  }
553
554  private void endBlock() throws IOException {
555    Preconditions.checkState(waitingAckQueue.isEmpty(),
556      "should call flush first before calling close");
557    if (state != State.STREAMING) {
558      throw new IOException("stream already broken");
559    }
560    state = State.CLOSING;
561    long finalizedLength = ackedBlockLength;
562    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
563    buf.release();
564    buf = null;
565    int headerLen = header.getSerializedSize();
566    ByteBuf headerBuf = alloc.directBuffer(headerLen);
567    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
568    headerBuf.writerIndex(headerLen);
569    CompletableFuture<Long> future = new CompletableFuture<>();
570    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
571    datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate()));
572    headerBuf.release();
573    FutureUtils.get(future);
574  }
575
576  private void closeDataNodeChannelsAndAwait() {
577    List<ChannelFuture> futures = new ArrayList<>();
578    for (Channel ch : datanodeInfoMap.keySet()) {
579      futures.add(ch.close());
580    }
581    for (ChannelFuture future : futures) {
582      consume(future.awaitUninterruptibly());
583    }
584  }
585
586  /**
587   * The close method when error occurred. Now we just call recoverFileLease.
588   */
589  @Override
590  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
591    if (buf != null) {
592      buf.release();
593      buf = null;
594    }
595    closeDataNodeChannelsAndAwait();
596    endFileLease(client, stat);
597    RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
598      reporter == null ? new CancelOnClose(client) : reporter);
599  }
600
601  /**
602   * End the current block and complete file at namenode. You should call
603   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
604   */
605  @Override
606  public void close() throws IOException {
607    endBlock();
608    state = State.CLOSED;
609    closeDataNodeChannelsAndAwait();
610    block.setNumBytes(ackedBlockLength);
611    completeFile(client, namenode, src, clientName, block, stat);
612  }
613
614  @Override
615  public boolean isBroken() {
616    return state == State.BROKEN;
617  }
618
619  @Override
620  public long getSyncedLength() {
621    return this.ackedBlockLength;
622  }
623
624  @RestrictedApi(explanation = "Should only be called in tests", link = "",
625      allowedOnPath = ".*/src/test/.*")
626  Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
627    return this.datanodeInfoMap;
628  }
629}