001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; 027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 032 033import com.google.errorprone.annotations.RestrictedApi; 034import java.io.IOException; 035import java.nio.ByteBuffer; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.Set; 043import java.util.concurrent.CompletableFuture; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ConcurrentLinkedDeque; 046import java.util.concurrent.TimeUnit; 047import java.util.function.Supplier; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.crypto.Encryptor; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 052import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 053import org.apache.hadoop.hbase.util.CancelableProgressable; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FutureUtils; 056import org.apache.hadoop.hbase.util.NettyFutureUtils; 057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 058import org.apache.hadoop.hdfs.DFSClient; 059import org.apache.hadoop.hdfs.DistributedFileSystem; 060import org.apache.hadoop.hdfs.protocol.ClientProtocol; 061import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 062import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.LocatedBlock; 065import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 066import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 069import org.apache.hadoop.util.DataChecksum; 070import org.apache.yetus.audience.InterfaceAudience; 071 072import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 074import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 075import org.apache.hbase.thirdparty.io.netty.channel.Channel; 076import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 077import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 078import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 079import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 080import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 081import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 082import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 083import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 084 085/** 086 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 087 * supports writing file with only one block. 088 * <p> 089 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main 090 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 091 * method. And we place it here under io package because we want to make it independent of WAL 092 * implementation thus easier to move it to HDFS project finally. 093 * <p> 094 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 095 * previous flush succeeds, the implementation is not thread safe, so you should not call its 096 * methods concurrently. 097 * <p> 098 * Advantages compare to DFSOutputStream: 099 * <ol> 100 * <li>The fan out mechanism. This will reduce the latency.</li> 101 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 102 * ASAP.</li> 103 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 104 * </ol> 105 */ 106@InterfaceAudience.Private 107public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 108 109 // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set 110 // a smaller limit for data size. 111 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 112 113 private final Configuration conf; 114 115 private final DistributedFileSystem dfs; 116 117 private final DFSClient client; 118 119 private final ClientProtocol namenode; 120 121 private final String clientName; 122 123 private final String src; 124 125 private HdfsFileStatus stat; 126 127 private final ExtendedBlock block; 128 129 private final DatanodeInfo[] locations; 130 131 private final Encryptor encryptor; 132 133 private final Map<Channel, DatanodeInfo> datanodeInfoMap; 134 135 private final DataChecksum summer; 136 137 private final int maxDataLen; 138 139 private final ByteBufAllocator alloc; 140 141 private static final class Callback { 142 143 private final CompletableFuture<Long> future; 144 145 private final long ackedLength; 146 147 // should be backed by a thread safe collection 148 private final Set<ChannelId> unfinishedReplicas; 149 private final long packetDataLen; 150 private final long flushTimestamp; 151 private long lastAckTimestamp = -1; 152 153 public Callback(CompletableFuture<Long> future, long ackedLength, 154 final Collection<Channel> replicas, long packetDataLen) { 155 this.future = future; 156 this.ackedLength = ackedLength; 157 this.packetDataLen = packetDataLen; 158 this.flushTimestamp = EnvironmentEdgeManager.currentTime(); 159 if (replicas.isEmpty()) { 160 this.unfinishedReplicas = Collections.emptySet(); 161 } else { 162 this.unfinishedReplicas = 163 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 164 replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); 165 } 166 } 167 } 168 169 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 170 171 private volatile long ackedBlockLength = 0L; 172 173 // this could be different from acked block length because a packet can not start at the middle of 174 // a chunk. 175 private long nextPacketOffsetInBlock = 0L; 176 177 // the length of the trailing partial chunk, this is because the packet start offset must be 178 // aligned with the length of checksum chunk, so we need to resend the same data. 179 private int trailingPartialChunkLength = 0; 180 181 private long nextPacketSeqno = 0L; 182 183 private ByteBuf buf; 184 185 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 186 187 // State for connections to DN 188 private enum State { 189 STREAMING, 190 CLOSING, 191 BROKEN, 192 CLOSED 193 } 194 195 private volatile State state; 196 197 private final StreamSlowMonitor streamSlowMonitor; 198 199 // all lock-free to make it run faster 200 private void completed(Channel channel) { 201 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 202 Callback c = iter.next(); 203 // if the current unfinished replicas does not contain us then it means that we have already 204 // acked this one, let's iterate to find the one we have not acked yet. 205 if (c.unfinishedReplicas.remove(channel.id())) { 206 long current = EnvironmentEdgeManager.currentTime(); 207 streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen, 208 current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size()); 209 c.lastAckTimestamp = current; 210 if (c.unfinishedReplicas.isEmpty()) { 211 // we need to remove first before complete the future. It is possible that after we 212 // complete the future the upper layer will call close immediately before we remove the 213 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 214 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 215 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 216 // removing the entry returned by calling previous next, so if the entry has already been 217 // removed then it is a no-op, and for the assign, the values are the same so no problem. 218 iter.remove(); 219 ackedBlockLength = c.ackedLength; 220 // the future.complete check is to confirm that we are the only one who grabbed the work, 221 // otherwise just give up and return. 222 if (c.future.complete(c.ackedLength)) { 223 // also wake up flush requests which have the same length. 224 while (iter.hasNext()) { 225 Callback maybeDummyCb = iter.next(); 226 if (maybeDummyCb.ackedLength == c.ackedLength) { 227 iter.remove(); 228 maybeDummyCb.future.complete(c.ackedLength); 229 } else { 230 break; 231 } 232 } 233 } 234 } 235 return; 236 } 237 } 238 } 239 240 // this usually does not happen which means it is not on the critical path so make it synchronized 241 // so that the implementation will not burn up our brain as there are multiple state changes and 242 // checks. 243 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 244 if (state == State.CLOSED) { 245 return; 246 } 247 if (state == State.BROKEN) { 248 failWaitingAckQueue(channel, errorSupplier); 249 return; 250 } 251 if (state == State.CLOSING) { 252 Callback c = waitingAckQueue.peekFirst(); 253 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 254 // nothing, the endBlock request has already finished. 255 return; 256 } 257 } 258 // disable further write, and fail all pending ack. 259 state = State.BROKEN; 260 failWaitingAckQueue(channel, errorSupplier); 261 datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose); 262 } 263 264 private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) { 265 Throwable error = errorSupplier.get(); 266 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 267 Callback c = iter.next(); 268 // find the first sync request which we have not acked yet and fail all the request after it. 269 if (!c.unfinishedReplicas.contains(channel.id())) { 270 continue; 271 } 272 for (;;) { 273 c.future.completeExceptionally(error); 274 if (!iter.hasNext()) { 275 break; 276 } 277 c = iter.next(); 278 } 279 break; 280 } 281 } 282 283 @Sharable 284 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 285 286 private final int timeoutMs; 287 288 public AckHandler(int timeoutMs) { 289 this.timeoutMs = timeoutMs; 290 } 291 292 @Override 293 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 294 Status reply = getStatus(ack); 295 if (reply != Status.SUCCESS) { 296 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block 297 + " from datanode " + ctx.channel().remoteAddress())); 298 return; 299 } 300 if (PipelineAck.isRestartOOBStatus(reply)) { 301 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " 302 + block + " from datanode " + ctx.channel().remoteAddress())); 303 return; 304 } 305 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 306 return; 307 } 308 completed(ctx.channel()); 309 } 310 311 @Override 312 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 313 if (state == State.CLOSED) { 314 return; 315 } 316 failed(ctx.channel(), 317 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 318 } 319 320 @Override 321 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 322 failed(ctx.channel(), () -> cause); 323 } 324 325 @Override 326 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 327 if (evt instanceof IdleStateEvent) { 328 IdleStateEvent e = (IdleStateEvent) evt; 329 if (e.state() == READER_IDLE) { 330 failed(ctx.channel(), 331 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 332 } else if (e.state() == WRITER_IDLE) { 333 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 334 int len = heartbeat.getSerializedSize(); 335 ByteBuf buf = alloc.buffer(len); 336 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 337 buf.writerIndex(len); 338 safeWriteAndFlush(ctx.channel(), buf); 339 } 340 return; 341 } 342 super.userEventTriggered(ctx, evt); 343 } 344 } 345 346 private void setupReceiver(int timeoutMs) { 347 AckHandler ackHandler = new AckHandler(timeoutMs); 348 for (Channel ch : datanodeInfoMap.keySet()) { 349 ch.pipeline().addLast( 350 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 351 new ProtobufVarint32FrameDecoder(), 352 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 353 ch.config().setAutoRead(true); 354 } 355 } 356 357 FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, 358 ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat, 359 LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, 360 DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { 361 this.conf = conf; 362 this.dfs = dfs; 363 this.client = client; 364 this.namenode = namenode; 365 this.stat = stat; 366 this.clientName = clientName; 367 this.src = src; 368 this.block = locatedBlock.getBlock(); 369 this.locations = getLocatedBlockLocations(locatedBlock); 370 this.encryptor = encryptor; 371 this.datanodeInfoMap = datanodeInfoMap; 372 this.summer = summer; 373 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 374 this.alloc = alloc; 375 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 376 this.state = State.STREAMING; 377 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 378 this.streamSlowMonitor = streamSlowMonitor; 379 } 380 381 @Override 382 public void writeInt(int i) { 383 buf.ensureWritable(4); 384 buf.writeInt(i); 385 } 386 387 @Override 388 public void write(ByteBuffer bb) { 389 buf.ensureWritable(bb.remaining()); 390 buf.writeBytes(bb); 391 } 392 393 @Override 394 public void write(byte[] b) { 395 write(b, 0, b.length); 396 } 397 398 @Override 399 public void write(byte[] b, int off, int len) { 400 buf.ensureWritable(len); 401 buf.writeBytes(b, off, len); 402 } 403 404 @Override 405 public int buffered() { 406 return buf.readableBytes(); 407 } 408 409 @Override 410 public DatanodeInfo[] getPipeline() { 411 return locations; 412 } 413 414 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 415 long nextPacketOffsetInBlock, boolean syncBlock) { 416 int dataLen = dataBuf.readableBytes(); 417 int chunkLen = summer.getBytesPerChecksum(); 418 int trailingPartialChunkLen = dataLen % chunkLen; 419 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 420 int checksumLen = numChecks * summer.getChecksumSize(); 421 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 422 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 423 checksumBuf.writerIndex(checksumLen); 424 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 425 nextPacketSeqno, false, dataLen, syncBlock); 426 int headerLen = header.getSerializedSize(); 427 ByteBuf headerBuf = alloc.buffer(headerLen); 428 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 429 headerBuf.writerIndex(headerLen); 430 Callback c = 431 new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen); 432 waitingAckQueue.addLast(c); 433 // recheck again after we pushed the callback to queue 434 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 435 future.completeExceptionally(new IOException("stream already broken")); 436 // it's the one we have just pushed or just a no-op 437 waitingAckQueue.removeFirst(); 438 439 checksumBuf.release(); 440 headerBuf.release(); 441 442 // This method takes ownership of the dataBuf, so we need release it before returning. 443 dataBuf.release(); 444 return; 445 } 446 // TODO: we should perhaps measure time taken per DN here; 447 // we could collect statistics per DN, and/or exclude bad nodes in createOutput. 448 datanodeInfoMap.keySet().forEach(ch -> { 449 safeWrite(ch, headerBuf.retainedDuplicate()); 450 safeWrite(ch, checksumBuf.retainedDuplicate()); 451 safeWriteAndFlush(ch, dataBuf.retainedDuplicate()); 452 }); 453 checksumBuf.release(); 454 headerBuf.release(); 455 dataBuf.release(); 456 nextPacketSeqno++; 457 } 458 459 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 460 if (state != State.STREAMING) { 461 future.completeExceptionally(new IOException("stream already broken")); 462 return; 463 } 464 int dataLen = buf.readableBytes(); 465 if (dataLen == trailingPartialChunkLength) { 466 // no new data 467 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 468 Callback lastFlush = waitingAckQueue.peekLast(); 469 if (lastFlush != null) { 470 Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen); 471 waitingAckQueue.addLast(c); 472 // recheck here if we have already removed the previous callback from the queue 473 if (waitingAckQueue.peekFirst() == c) { 474 // all previous callbacks have been removed 475 // notice that this does mean we will always win here because the background thread may 476 // have already started to mark the future here as completed in the completed or failed 477 // methods but haven't removed it from the queue yet. That's also why the removeFirst 478 // call below may be a no-op. 479 if (state != State.STREAMING) { 480 future.completeExceptionally(new IOException("stream already broken")); 481 } else { 482 future.complete(lengthAfterFlush); 483 } 484 // it's the one we have just pushed or just a no-op 485 waitingAckQueue.removeFirst(); 486 } 487 } else { 488 // we must have acked all the data so the ackedBlockLength must be same with 489 // lengthAfterFlush 490 future.complete(lengthAfterFlush); 491 } 492 return; 493 } 494 495 if (encryptor != null) { 496 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 497 buf.readBytes(encryptBuf, trailingPartialChunkLength); 498 int toEncryptLength = dataLen - trailingPartialChunkLength; 499 try { 500 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 501 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 502 } catch (IOException e) { 503 encryptBuf.release(); 504 future.completeExceptionally(e); 505 return; 506 } 507 encryptBuf.writerIndex(dataLen); 508 buf.release(); 509 buf = encryptBuf; 510 } 511 512 if (dataLen > maxDataLen) { 513 // We need to write out the data by multiple packets as the max packet allowed is 16M. 514 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 515 for (int remaining = dataLen;;) { 516 if (remaining < maxDataLen) { 517 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 518 syncBlock); 519 break; 520 } else { 521 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 522 nextSubPacketOffsetInBlock, syncBlock); 523 remaining -= maxDataLen; 524 nextSubPacketOffsetInBlock += maxDataLen; 525 } 526 } 527 } else { 528 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 529 } 530 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 531 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 532 .ensureWritable(trailingPartialChunkLength); 533 if (trailingPartialChunkLength != 0) { 534 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 535 trailingPartialChunkLength); 536 } 537 buf.release(); 538 this.buf = newBuf; 539 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 540 } 541 542 /** 543 * Flush the buffer out to datanodes. 544 * @param syncBlock will call hsync if true, otherwise hflush. 545 * @return A CompletableFuture that hold the acked length after flushing. 546 */ 547 @Override 548 public CompletableFuture<Long> flush(boolean syncBlock) { 549 CompletableFuture<Long> future = new CompletableFuture<>(); 550 flush0(future, syncBlock); 551 return future; 552 } 553 554 private void endBlock() throws IOException { 555 Preconditions.checkState(waitingAckQueue.isEmpty(), 556 "should call flush first before calling close"); 557 if (state != State.STREAMING) { 558 throw new IOException("stream already broken"); 559 } 560 state = State.CLOSING; 561 long finalizedLength = ackedBlockLength; 562 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 563 buf.release(); 564 buf = null; 565 int headerLen = header.getSerializedSize(); 566 ByteBuf headerBuf = alloc.directBuffer(headerLen); 567 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 568 headerBuf.writerIndex(headerLen); 569 CompletableFuture<Long> future = new CompletableFuture<>(); 570 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); 571 datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate())); 572 headerBuf.release(); 573 FutureUtils.get(future); 574 } 575 576 private void closeDataNodeChannelsAndAwait() { 577 List<ChannelFuture> futures = new ArrayList<>(); 578 for (Channel ch : datanodeInfoMap.keySet()) { 579 futures.add(ch.close()); 580 } 581 for (ChannelFuture future : futures) { 582 consume(future.awaitUninterruptibly()); 583 } 584 } 585 586 /** 587 * The close method when error occurred. Now we just call recoverFileLease. 588 */ 589 @Override 590 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 591 if (buf != null) { 592 buf.release(); 593 buf = null; 594 } 595 closeDataNodeChannelsAndAwait(); 596 endFileLease(client, stat); 597 RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, 598 reporter == null ? new CancelOnClose(client) : reporter); 599 } 600 601 /** 602 * End the current block and complete file at namenode. You should call 603 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 604 */ 605 @Override 606 public void close() throws IOException { 607 endBlock(); 608 state = State.CLOSED; 609 closeDataNodeChannelsAndAwait(); 610 block.setNumBytes(ackedBlockLength); 611 completeFile(client, namenode, src, clientName, block, stat); 612 } 613 614 @Override 615 public boolean isBroken() { 616 return state == State.BROKEN; 617 } 618 619 @Override 620 public long getSyncedLength() { 621 return this.ackedBlockLength; 622 } 623 624 @RestrictedApi(explanation = "Should only be called in tests", link = "", 625 allowedOnPath = ".*/src/test/.*") 626 Map<Channel, DatanodeInfo> getDatanodeInfoMap() { 627 return this.datanodeInfoMap; 628 } 629}