001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; 022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; 023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; 024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 025import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 026import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume; 027import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 028import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 030import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; 032 033import com.google.errorprone.annotations.RestrictedApi; 034import java.io.IOException; 035import java.nio.ByteBuffer; 036import java.util.ArrayList; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.Set; 043import java.util.concurrent.CompletableFuture; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ConcurrentLinkedDeque; 046import java.util.concurrent.TimeUnit; 047import java.util.function.Supplier; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.crypto.Encryptor; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; 052import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 053import org.apache.hadoop.hbase.util.CancelableProgressable; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FutureUtils; 056import org.apache.hadoop.hbase.util.NettyFutureUtils; 057import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 058import org.apache.hadoop.hdfs.DFSClient; 059import org.apache.hadoop.hdfs.DistributedFileSystem; 060import org.apache.hadoop.hdfs.protocol.ClientProtocol; 061import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 062import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 063import org.apache.hadoop.hdfs.protocol.LocatedBlock; 064import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 065import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 066import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 068import org.apache.hadoop.util.DataChecksum; 069import org.apache.yetus.audience.InterfaceAudience; 070 071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 072import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 074import org.apache.hbase.thirdparty.io.netty.channel.Channel; 075import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 076import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; 077import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 078import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; 079import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 080import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 081import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 082import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 083 084/** 085 * An asynchronous HDFS output stream implementation which fans out data to datanode and only 086 * supports writing file with only one block. 087 * <p> 088 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The main 089 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the 090 * method. And we place it here under io package because we want to make it independent of WAL 091 * implementation thus easier to move it to HDFS project finally. 092 * <p> 093 * Note that, although we support pipelined flush, i.e, write new data and then flush before the 094 * previous flush succeeds, the implementation is not thread safe, so you should not call its 095 * methods concurrently. 096 * <p> 097 * Advantages compare to DFSOutputStream: 098 * <ol> 099 * <li>The fan out mechanism. This will reduce the latency.</li> 100 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer 101 * ASAP.</li> 102 * <li>We could benefit from netty's ByteBuf management mechanism.</li> 103 * </ol> 104 */ 105@InterfaceAudience.Private 106public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { 107 108 // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set 109 // a smaller limit for data size. 110 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; 111 112 private final Configuration conf; 113 114 private final DistributedFileSystem dfs; 115 116 private final DFSClient client; 117 118 private final ClientProtocol namenode; 119 120 private final String clientName; 121 122 private final String src; 123 124 private final long fileId; 125 126 private final ExtendedBlock block; 127 128 private final DatanodeInfo[] locations; 129 130 private final Encryptor encryptor; 131 132 private final Map<Channel, DatanodeInfo> datanodeInfoMap; 133 134 private final DataChecksum summer; 135 136 private final int maxDataLen; 137 138 private final ByteBufAllocator alloc; 139 140 private static final class Callback { 141 142 private final CompletableFuture<Long> future; 143 144 private final long ackedLength; 145 146 // should be backed by a thread safe collection 147 private final Set<ChannelId> unfinishedReplicas; 148 private final long packetDataLen; 149 private final long flushTimestamp; 150 private long lastAckTimestamp = -1; 151 152 public Callback(CompletableFuture<Long> future, long ackedLength, 153 final Collection<Channel> replicas, long packetDataLen) { 154 this.future = future; 155 this.ackedLength = ackedLength; 156 this.packetDataLen = packetDataLen; 157 this.flushTimestamp = EnvironmentEdgeManager.currentTime(); 158 if (replicas.isEmpty()) { 159 this.unfinishedReplicas = Collections.emptySet(); 160 } else { 161 this.unfinishedReplicas = 162 Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); 163 replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); 164 } 165 } 166 } 167 168 private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>(); 169 170 private volatile long ackedBlockLength = 0L; 171 172 // this could be different from acked block length because a packet can not start at the middle of 173 // a chunk. 174 private long nextPacketOffsetInBlock = 0L; 175 176 // the length of the trailing partial chunk, this is because the packet start offset must be 177 // aligned with the length of checksum chunk, so we need to resend the same data. 178 private int trailingPartialChunkLength = 0; 179 180 private long nextPacketSeqno = 0L; 181 182 private ByteBuf buf; 183 184 private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor(); 185 186 // State for connections to DN 187 private enum State { 188 STREAMING, 189 CLOSING, 190 BROKEN, 191 CLOSED 192 } 193 194 private volatile State state; 195 196 private final StreamSlowMonitor streamSlowMonitor; 197 198 // all lock-free to make it run faster 199 private void completed(Channel channel) { 200 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 201 Callback c = iter.next(); 202 // if the current unfinished replicas does not contain us then it means that we have already 203 // acked this one, let's iterate to find the one we have not acked yet. 204 if (c.unfinishedReplicas.remove(channel.id())) { 205 long current = EnvironmentEdgeManager.currentTime(); 206 streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen, 207 current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size()); 208 c.lastAckTimestamp = current; 209 if (c.unfinishedReplicas.isEmpty()) { 210 // we need to remove first before complete the future. It is possible that after we 211 // complete the future the upper layer will call close immediately before we remove the 212 // entry from waitingAckQueue and lead to an IllegalStateException. And also set the 213 // ackedBlockLength first otherwise we may use a wrong length to commit the block. This 214 // may lead to multiple remove and assign but is OK. The semantic of iter.remove is 215 // removing the entry returned by calling previous next, so if the entry has already been 216 // removed then it is a no-op, and for the assign, the values are the same so no problem. 217 iter.remove(); 218 ackedBlockLength = c.ackedLength; 219 // the future.complete check is to confirm that we are the only one who grabbed the work, 220 // otherwise just give up and return. 221 if (c.future.complete(c.ackedLength)) { 222 // also wake up flush requests which have the same length. 223 while (iter.hasNext()) { 224 Callback maybeDummyCb = iter.next(); 225 if (maybeDummyCb.ackedLength == c.ackedLength) { 226 iter.remove(); 227 maybeDummyCb.future.complete(c.ackedLength); 228 } else { 229 break; 230 } 231 } 232 } 233 } 234 return; 235 } 236 } 237 } 238 239 // this usually does not happen which means it is not on the critical path so make it synchronized 240 // so that the implementation will not burn up our brain as there are multiple state changes and 241 // checks. 242 private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) { 243 if (state == State.CLOSED) { 244 return; 245 } 246 if (state == State.BROKEN) { 247 failWaitingAckQueue(channel, errorSupplier); 248 return; 249 } 250 if (state == State.CLOSING) { 251 Callback c = waitingAckQueue.peekFirst(); 252 if (c == null || !c.unfinishedReplicas.contains(channel.id())) { 253 // nothing, the endBlock request has already finished. 254 return; 255 } 256 } 257 // disable further write, and fail all pending ack. 258 state = State.BROKEN; 259 failWaitingAckQueue(channel, errorSupplier); 260 datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose); 261 } 262 263 private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) { 264 Throwable error = errorSupplier.get(); 265 for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { 266 Callback c = iter.next(); 267 // find the first sync request which we have not acked yet and fail all the request after it. 268 if (!c.unfinishedReplicas.contains(channel.id())) { 269 continue; 270 } 271 for (;;) { 272 c.future.completeExceptionally(error); 273 if (!iter.hasNext()) { 274 break; 275 } 276 c = iter.next(); 277 } 278 break; 279 } 280 } 281 282 @Sharable 283 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { 284 285 private final int timeoutMs; 286 287 public AckHandler(int timeoutMs) { 288 this.timeoutMs = timeoutMs; 289 } 290 291 @Override 292 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { 293 Status reply = getStatus(ack); 294 if (reply != Status.SUCCESS) { 295 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + block 296 + " from datanode " + ctx.channel().remoteAddress())); 297 return; 298 } 299 if (PipelineAck.isRestartOOBStatus(reply)) { 300 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " 301 + block + " from datanode " + ctx.channel().remoteAddress())); 302 return; 303 } 304 if (ack.getSeqno() == HEART_BEAT_SEQNO) { 305 return; 306 } 307 completed(ctx.channel()); 308 } 309 310 @Override 311 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 312 if (state == State.CLOSED) { 313 return; 314 } 315 failed(ctx.channel(), 316 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); 317 } 318 319 @Override 320 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 321 failed(ctx.channel(), () -> cause); 322 } 323 324 @Override 325 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 326 if (evt instanceof IdleStateEvent) { 327 IdleStateEvent e = (IdleStateEvent) evt; 328 if (e.state() == READER_IDLE) { 329 failed(ctx.channel(), 330 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 331 } else if (e.state() == WRITER_IDLE) { 332 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); 333 int len = heartbeat.getSerializedSize(); 334 ByteBuf buf = alloc.buffer(len); 335 heartbeat.putInBuffer(buf.nioBuffer(0, len)); 336 buf.writerIndex(len); 337 safeWriteAndFlush(ctx.channel(), buf); 338 } 339 return; 340 } 341 super.userEventTriggered(ctx, evt); 342 } 343 } 344 345 private void setupReceiver(int timeoutMs) { 346 AckHandler ackHandler = new AckHandler(timeoutMs); 347 for (Channel ch : datanodeInfoMap.keySet()) { 348 ch.pipeline().addLast( 349 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), 350 new ProtobufVarint32FrameDecoder(), 351 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); 352 ch.config().setAutoRead(true); 353 } 354 } 355 356 FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, 357 ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock, 358 Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, 359 ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { 360 this.conf = conf; 361 this.dfs = dfs; 362 this.client = client; 363 this.namenode = namenode; 364 this.fileId = fileId; 365 this.clientName = clientName; 366 this.src = src; 367 this.block = locatedBlock.getBlock(); 368 this.locations = getLocatedBlockLocations(locatedBlock); 369 this.encryptor = encryptor; 370 this.datanodeInfoMap = datanodeInfoMap; 371 this.summer = summer; 372 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); 373 this.alloc = alloc; 374 this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); 375 this.state = State.STREAMING; 376 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); 377 this.streamSlowMonitor = streamSlowMonitor; 378 } 379 380 @Override 381 public void writeInt(int i) { 382 buf.ensureWritable(4); 383 buf.writeInt(i); 384 } 385 386 @Override 387 public void write(ByteBuffer bb) { 388 buf.ensureWritable(bb.remaining()); 389 buf.writeBytes(bb); 390 } 391 392 @Override 393 public void write(byte[] b) { 394 write(b, 0, b.length); 395 } 396 397 @Override 398 public void write(byte[] b, int off, int len) { 399 buf.ensureWritable(len); 400 buf.writeBytes(b, off, len); 401 } 402 403 @Override 404 public int buffered() { 405 return buf.readableBytes(); 406 } 407 408 @Override 409 public DatanodeInfo[] getPipeline() { 410 return locations; 411 } 412 413 private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf, 414 long nextPacketOffsetInBlock, boolean syncBlock) { 415 int dataLen = dataBuf.readableBytes(); 416 int chunkLen = summer.getBytesPerChecksum(); 417 int trailingPartialChunkLen = dataLen % chunkLen; 418 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); 419 int checksumLen = numChecks * summer.getChecksumSize(); 420 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); 421 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); 422 checksumBuf.writerIndex(checksumLen); 423 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, 424 nextPacketSeqno, false, dataLen, syncBlock); 425 int headerLen = header.getSerializedSize(); 426 ByteBuf headerBuf = alloc.buffer(headerLen); 427 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 428 headerBuf.writerIndex(headerLen); 429 Callback c = 430 new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen); 431 waitingAckQueue.addLast(c); 432 // recheck again after we pushed the callback to queue 433 if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { 434 future.completeExceptionally(new IOException("stream already broken")); 435 // it's the one we have just pushed or just a no-op 436 waitingAckQueue.removeFirst(); 437 438 checksumBuf.release(); 439 headerBuf.release(); 440 441 // This method takes ownership of the dataBuf, so we need release it before returning. 442 dataBuf.release(); 443 return; 444 } 445 // TODO: we should perhaps measure time taken per DN here; 446 // we could collect statistics per DN, and/or exclude bad nodes in createOutput. 447 datanodeInfoMap.keySet().forEach(ch -> { 448 safeWrite(ch, headerBuf.retainedDuplicate()); 449 safeWrite(ch, checksumBuf.retainedDuplicate()); 450 safeWriteAndFlush(ch, dataBuf.retainedDuplicate()); 451 }); 452 checksumBuf.release(); 453 headerBuf.release(); 454 dataBuf.release(); 455 nextPacketSeqno++; 456 } 457 458 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { 459 if (state != State.STREAMING) { 460 future.completeExceptionally(new IOException("stream already broken")); 461 return; 462 } 463 int dataLen = buf.readableBytes(); 464 if (dataLen == trailingPartialChunkLength) { 465 // no new data 466 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; 467 Callback lastFlush = waitingAckQueue.peekLast(); 468 if (lastFlush != null) { 469 Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen); 470 waitingAckQueue.addLast(c); 471 // recheck here if we have already removed the previous callback from the queue 472 if (waitingAckQueue.peekFirst() == c) { 473 // all previous callbacks have been removed 474 // notice that this does mean we will always win here because the background thread may 475 // have already started to mark the future here as completed in the completed or failed 476 // methods but haven't removed it from the queue yet. That's also why the removeFirst 477 // call below may be a no-op. 478 if (state != State.STREAMING) { 479 future.completeExceptionally(new IOException("stream already broken")); 480 } else { 481 future.complete(lengthAfterFlush); 482 } 483 // it's the one we have just pushed or just a no-op 484 waitingAckQueue.removeFirst(); 485 } 486 } else { 487 // we must have acked all the data so the ackedBlockLength must be same with 488 // lengthAfterFlush 489 future.complete(lengthAfterFlush); 490 } 491 return; 492 } 493 494 if (encryptor != null) { 495 ByteBuf encryptBuf = alloc.directBuffer(dataLen); 496 buf.readBytes(encryptBuf, trailingPartialChunkLength); 497 int toEncryptLength = dataLen - trailingPartialChunkLength; 498 try { 499 encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength), 500 encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength)); 501 } catch (IOException e) { 502 encryptBuf.release(); 503 future.completeExceptionally(e); 504 return; 505 } 506 encryptBuf.writerIndex(dataLen); 507 buf.release(); 508 buf = encryptBuf; 509 } 510 511 if (dataLen > maxDataLen) { 512 // We need to write out the data by multiple packets as the max packet allowed is 16M. 513 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; 514 for (int remaining = dataLen;;) { 515 if (remaining < maxDataLen) { 516 flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock, 517 syncBlock); 518 break; 519 } else { 520 flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen), 521 nextSubPacketOffsetInBlock, syncBlock); 522 remaining -= maxDataLen; 523 nextSubPacketOffsetInBlock += maxDataLen; 524 } 525 } 526 } else { 527 flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock); 528 } 529 trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum(); 530 ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen)) 531 .ensureWritable(trailingPartialChunkLength); 532 if (trailingPartialChunkLength != 0) { 533 buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf, 534 trailingPartialChunkLength); 535 } 536 buf.release(); 537 this.buf = newBuf; 538 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength; 539 } 540 541 /** 542 * Flush the buffer out to datanodes. 543 * @param syncBlock will call hsync if true, otherwise hflush. 544 * @return A CompletableFuture that hold the acked length after flushing. 545 */ 546 @Override 547 public CompletableFuture<Long> flush(boolean syncBlock) { 548 CompletableFuture<Long> future = new CompletableFuture<>(); 549 flush0(future, syncBlock); 550 return future; 551 } 552 553 private void endBlock() throws IOException { 554 Preconditions.checkState(waitingAckQueue.isEmpty(), 555 "should call flush first before calling close"); 556 if (state != State.STREAMING) { 557 throw new IOException("stream already broken"); 558 } 559 state = State.CLOSING; 560 long finalizedLength = ackedBlockLength; 561 PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false); 562 buf.release(); 563 buf = null; 564 int headerLen = header.getSerializedSize(); 565 ByteBuf headerBuf = alloc.directBuffer(headerLen); 566 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); 567 headerBuf.writerIndex(headerLen); 568 CompletableFuture<Long> future = new CompletableFuture<>(); 569 waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); 570 datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, headerBuf.retainedDuplicate())); 571 headerBuf.release(); 572 FutureUtils.get(future); 573 } 574 575 private void closeDataNodeChannelsAndAwait() { 576 List<ChannelFuture> futures = new ArrayList<>(); 577 for (Channel ch : datanodeInfoMap.keySet()) { 578 futures.add(ch.close()); 579 } 580 for (ChannelFuture future : futures) { 581 consume(future.awaitUninterruptibly()); 582 } 583 } 584 585 /** 586 * The close method when error occurred. Now we just call recoverFileLease. 587 */ 588 @Override 589 public void recoverAndClose(CancelableProgressable reporter) throws IOException { 590 if (buf != null) { 591 buf.release(); 592 buf = null; 593 } 594 closeDataNodeChannelsAndAwait(); 595 endFileLease(client, fileId); 596 RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, 597 reporter == null ? new CancelOnClose(client) : reporter); 598 } 599 600 /** 601 * End the current block and complete file at namenode. You should call 602 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. 603 */ 604 @Override 605 public void close() throws IOException { 606 endBlock(); 607 state = State.CLOSED; 608 closeDataNodeChannelsAndAwait(); 609 block.setNumBytes(ackedBlockLength); 610 completeFile(client, namenode, src, clientName, block, fileId); 611 } 612 613 @Override 614 public boolean isBroken() { 615 return state == State.BROKEN; 616 } 617 618 @Override 619 public long getSyncedLength() { 620 return this.ackedBlockLength; 621 } 622 623 @RestrictedApi(explanation = "Should only be called in tests", link = "", 624 allowedOnPath = ".*/src/test/.*") 625 Map<Channel, DatanodeInfo> getDatanodeInfoMap() { 626 return this.datanodeInfoMap; 627 } 628}