001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener; 024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; 025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 032 033import java.io.IOException; 034import java.io.InterruptedIOException; 035import java.lang.reflect.InvocationTargetException; 036import java.lang.reflect.Method; 037import java.util.ArrayList; 038import java.util.Collection; 039import java.util.EnumSet; 040import java.util.HashSet; 041import java.util.IdentityHashMap; 042import java.util.List; 043import java.util.Map; 044import java.util.Set; 045import java.util.concurrent.TimeUnit; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.crypto.CryptoProtocolVersion; 049import org.apache.hadoop.crypto.Encryptor; 050import org.apache.hadoop.fs.CreateFlag; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.FileSystemLinkResolver; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.StorageType; 055import org.apache.hadoop.fs.UnresolvedLinkException; 056import org.apache.hadoop.fs.permission.FsPermission; 057import org.apache.hadoop.hbase.client.ConnectionUtils; 058import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 059import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 060import org.apache.hadoop.hbase.util.CancelableProgressable; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.DFSOutputStream; 063import org.apache.hadoop.hdfs.DistributedFileSystem; 064import org.apache.hadoop.hdfs.protocol.ClientProtocol; 065import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 066import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 067import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 068import org.apache.hadoop.hdfs.protocol.LocatedBlock; 069import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 071import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 072import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 074import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 082import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 083import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 084import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 085import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 086import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 087import org.apache.hadoop.io.EnumSetWritable; 088import org.apache.hadoop.ipc.RemoteException; 089import org.apache.hadoop.net.NetUtils; 090import org.apache.hadoop.security.token.Token; 091import org.apache.hadoop.util.DataChecksum; 092import org.apache.yetus.audience.InterfaceAudience; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 097import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 099import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 100import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 101import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 102import org.apache.hbase.thirdparty.io.netty.channel.Channel; 103import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 104import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 105import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 106import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 107import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 108import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 109import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 110import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 111import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 112import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 113import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 114import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 115import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 116import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 117import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 118 119/** 120 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 121 */ 122@InterfaceAudience.Private 123public final class FanOutOneBlockAsyncDFSOutputHelper { 124 private static final Logger LOG = 125 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 126 127 private FanOutOneBlockAsyncDFSOutputHelper() { 128 } 129 130 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 131 132 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 133 // use pooled allocator for performance. 134 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 135 136 // copied from DFSPacket since it is package private. 137 public static final long HEART_BEAT_SEQNO = -1L; 138 139 // Timeouts for communicating with DataNode for streaming writes/reads 140 public static final int READ_TIMEOUT = 60 * 1000; 141 142 private interface LeaseManager { 143 144 void begin(DFSClient client, HdfsFileStatus stat); 145 146 void end(DFSClient client, HdfsFileStatus stat); 147 } 148 149 private static final LeaseManager LEASE_MANAGER; 150 151 // helper class for creating files. 152 private interface FileCreator { 153 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 154 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, 155 long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception { 156 try { 157 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 158 replication, blockSize, supportedVersions); 159 } catch (InvocationTargetException e) { 160 if (e.getCause() instanceof Exception) { 161 throw (Exception) e.getCause(); 162 } else { 163 throw new RuntimeException(e.getCause()); 164 } 165 } 166 } 167 168 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 169 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 170 CryptoProtocolVersion[] supportedVersions) throws Exception; 171 } 172 173 private static final FileCreator FILE_CREATOR; 174 175 private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException { 176 Method beginFileLeaseMethod = 177 DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); 178 beginFileLeaseMethod.setAccessible(true); 179 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class); 180 endFileLeaseMethod.setAccessible(true); 181 Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration"); 182 getConfigurationMethod.setAccessible(true); 183 Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace"); 184 185 return new LeaseManager() { 186 187 private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY = 188 "dfs.client.output.stream.uniq.default.key"; 189 private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT"; 190 191 private String getUniqId(DFSClient client, HdfsFileStatus stat) 192 throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { 193 // Copied from DFSClient in Hadoop 3.4.0 194 long fileId = stat.getFileId(); 195 String namespace = (String) getNamespaceMehtod.invoke(stat); 196 if (namespace == null) { 197 Configuration conf = (Configuration) getConfigurationMethod.invoke(client); 198 String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY, 199 DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT); 200 return defaultKey + "_" + fileId; 201 } else { 202 return namespace + "_" + fileId; 203 } 204 } 205 206 @Override 207 public void begin(DFSClient client, HdfsFileStatus stat) { 208 try { 209 beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null); 210 } catch (IllegalAccessException | InvocationTargetException e) { 211 throw new RuntimeException(e); 212 } 213 } 214 215 @Override 216 public void end(DFSClient client, HdfsFileStatus stat) { 217 try { 218 endFileLeaseMethod.invoke(client, getUniqId(client, stat)); 219 } catch (IllegalAccessException | InvocationTargetException e) { 220 throw new RuntimeException(e); 221 } 222 } 223 }; 224 } 225 226 private static LeaseManager createLeaseManager3() throws NoSuchMethodException { 227 Method beginFileLeaseMethod = 228 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 229 beginFileLeaseMethod.setAccessible(true); 230 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 231 endFileLeaseMethod.setAccessible(true); 232 return new LeaseManager() { 233 234 @Override 235 public void begin(DFSClient client, HdfsFileStatus stat) { 236 try { 237 beginFileLeaseMethod.invoke(client, stat.getFileId(), null); 238 } catch (IllegalAccessException | InvocationTargetException e) { 239 throw new RuntimeException(e); 240 } 241 } 242 243 @Override 244 public void end(DFSClient client, HdfsFileStatus stat) { 245 try { 246 endFileLeaseMethod.invoke(client, stat.getFileId()); 247 } catch (IllegalAccessException | InvocationTargetException e) { 248 throw new RuntimeException(e); 249 } 250 } 251 }; 252 } 253 254 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 255 try { 256 return createLeaseManager3_4(); 257 } catch (NoSuchMethodException e) { 258 LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below"); 259 } 260 261 return createLeaseManager3(); 262 } 263 264 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 265 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 266 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 267 CryptoProtocolVersion[].class, String.class, String.class); 268 269 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 270 supportedVersions) -> { 271 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 272 createParent, replication, blockSize, supportedVersions, null, null); 273 }; 274 } 275 276 private static FileCreator createFileCreator3() throws NoSuchMethodException { 277 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 278 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 279 CryptoProtocolVersion[].class, String.class); 280 281 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 282 supportedVersions) -> { 283 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 284 createParent, replication, blockSize, supportedVersions, null); 285 }; 286 } 287 288 private static FileCreator createFileCreator() throws NoSuchMethodException { 289 try { 290 return createFileCreator3_3(); 291 } catch (NoSuchMethodException e) { 292 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 293 } 294 295 return createFileCreator3(); 296 } 297 298 // cancel the processing if DFSClient is already closed. 299 static final class CancelOnClose implements CancelableProgressable { 300 301 private final DFSClient client; 302 303 public CancelOnClose(DFSClient client) { 304 this.client = client; 305 } 306 307 @Override 308 public boolean progress() { 309 return client.isClientRunning(); 310 } 311 } 312 313 static { 314 try { 315 LEASE_MANAGER = createLeaseManager(); 316 FILE_CREATOR = createFileCreator(); 317 } catch (Exception e) { 318 String msg = "Couldn't properly initialize access to HDFS internals. Please " 319 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 320 + "HBASE-16110 for more information."; 321 LOG.error(msg, e); 322 throw new Error(msg, e); 323 } 324 } 325 326 static void beginFileLease(DFSClient client, HdfsFileStatus stat) { 327 LEASE_MANAGER.begin(client, stat); 328 } 329 330 static void endFileLease(DFSClient client, HdfsFileStatus stat) { 331 LEASE_MANAGER.end(client, stat); 332 } 333 334 static DataChecksum createChecksum(DFSClient client) { 335 return client.getConf().createChecksum(null); 336 } 337 338 static Status getStatus(PipelineAckProto ack) { 339 List<Integer> flagList = ack.getFlagList(); 340 Integer headerFlag; 341 if (flagList.isEmpty()) { 342 Status reply = ack.getReply(0); 343 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 344 } else { 345 headerFlag = flagList.get(0); 346 } 347 return PipelineAck.getStatusFromHeader(headerFlag); 348 } 349 350 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 351 Promise<Channel> promise, int timeoutMs) { 352 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 353 new ProtobufVarint32FrameDecoder(), 354 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 355 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 356 357 @Override 358 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 359 throws Exception { 360 Status pipelineStatus = resp.getStatus(); 361 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 362 throw new IOException("datanode " + dnInfo + " is restarting"); 363 } 364 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 365 if (resp.getStatus() != Status.SUCCESS) { 366 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 367 throw new InvalidBlockTokenException("Got access token error" + ", status message " 368 + resp.getMessage() + ", " + logInfo); 369 } else { 370 throw new IOException("Got error" + ", status=" + resp.getStatus().name() 371 + ", status message " + resp.getMessage() + ", " + logInfo); 372 } 373 } 374 // success 375 ChannelPipeline p = ctx.pipeline(); 376 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 377 // do not remove all handlers because we may have wrap or unwrap handlers at the header 378 // of pipeline. 379 if (handler instanceof IdleStateHandler) { 380 break; 381 } 382 } 383 // Disable auto read here. Enable it after we setup the streaming pipeline in 384 // FanOutOneBLockAsyncDFSOutput. 385 ctx.channel().config().setAutoRead(false); 386 promise.trySuccess(ctx.channel()); 387 } 388 389 @Override 390 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 391 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 392 } 393 394 @Override 395 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 396 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 397 promise 398 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 399 } else { 400 super.userEventTriggered(ctx, evt); 401 } 402 } 403 404 @Override 405 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 406 promise.tryFailure(cause); 407 } 408 }); 409 } 410 411 private static void requestWriteBlock(Channel channel, StorageType storageType, 412 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 413 OpWriteBlockProto proto = 414 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 415 int protoLen = proto.getSerializedSize(); 416 ByteBuf buffer = 417 channel.alloc().buffer(3 + CodedOutputStream.computeUInt32SizeNoTag(protoLen) + protoLen); 418 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 419 buffer.writeByte(Op.WRITE_BLOCK.code); 420 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 421 safeWriteAndFlush(channel, buffer); 422 } 423 424 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 425 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 426 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 427 throws IOException { 428 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 429 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 430 addListener(saslPromise, new FutureListener<Void>() { 431 432 @Override 433 public void operationComplete(Future<Void> future) throws Exception { 434 if (future.isSuccess()) { 435 // setup response processing pipeline first, then send request. 436 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 437 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 438 } else { 439 promise.tryFailure(future.cause()); 440 } 441 } 442 }); 443 } 444 445 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 446 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 447 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 448 Class<? extends Channel> channelClass) { 449 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 450 DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock); 451 boolean connectToDnViaHostname = 452 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 453 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 454 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 455 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 456 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 457 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 458 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 459 .setClientName(clientName).build(); 460 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 461 OpWriteBlockProto.Builder writeBlockProtoBuilder = 462 OpWriteBlockProto.newBuilder().setHeader(header) 463 .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1) 464 .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd) 465 .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto) 466 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 467 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 468 for (int i = 0; i < datanodeInfos.length; i++) { 469 DatanodeInfo dnInfo = datanodeInfos[i]; 470 StorageType storageType = storageTypes[i]; 471 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 472 futureList.add(promise); 473 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 474 addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass) 475 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 476 477 @Override 478 protected void initChannel(Channel ch) throws Exception { 479 // we need to get the remote address of the channel so we can only move on after 480 // channel connected. Leave an empty implementation here because netty does not allow 481 // a null handler. 482 } 483 }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() { 484 485 @Override 486 public void operationComplete(ChannelFuture future) throws Exception { 487 if (future.isSuccess()) { 488 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 489 timeoutMs, client, locatedBlock.getBlockToken(), promise); 490 } else { 491 promise.tryFailure(future.cause()); 492 } 493 } 494 }); 495 } 496 return futureList; 497 } 498 499 /** 500 * Exception other than RemoteException thrown when calling create on namenode 501 */ 502 public static class NameNodeException extends IOException { 503 504 private static final long serialVersionUID = 3143237406477095390L; 505 506 public NameNodeException(Throwable cause) { 507 super(cause); 508 } 509 } 510 511 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite, 512 boolean noLocalWrite) { 513 List<CreateFlag> flags = new ArrayList<>(); 514 flags.add(CreateFlag.CREATE); 515 if (overwrite) { 516 flags.add(CreateFlag.OVERWRITE); 517 } 518 if (noLocalWrite) { 519 flags.add(CreateFlag.NO_LOCAL_WRITE); 520 } 521 flags.add(CreateFlag.SHOULD_REPLICATE); 522 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 523 } 524 525 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 526 boolean overwrite, boolean createParent, short replication, long blockSize, 527 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor, 528 boolean noLocalWrite) throws IOException { 529 Configuration conf = dfs.getConf(); 530 DFSClient client = dfs.getClient(); 531 String clientName = client.getClientName(); 532 ClientProtocol namenode = client.getNamenode(); 533 int createMaxRetries = 534 conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 535 ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); 536 Set<DatanodeInfo> toExcludeNodes = 537 new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); 538 for (int retry = 0;; retry++) { 539 if (LOG.isDebugEnabled()) { 540 LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, 541 getDataNodeInfo(toExcludeNodes), retry); 542 } 543 HdfsFileStatus stat; 544 try { 545 stat = FILE_CREATOR.create(namenode, src, 546 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 547 getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, 548 CryptoProtocolVersion.supported()); 549 } catch (Exception e) { 550 if (e instanceof RemoteException) { 551 throw (RemoteException) e; 552 } else { 553 throw new NameNodeException(e); 554 } 555 } 556 beginFileLease(client, stat); 557 boolean succ = false; 558 LocatedBlock locatedBlock = null; 559 List<Future<Channel>> futureList = null; 560 try { 561 DataChecksum summer = createChecksum(client); 562 locatedBlock = namenode.addBlock(src, client.getClientName(), null, 563 toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); 564 Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>(); 565 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 566 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 567 for (int i = 0, n = futureList.size(); i < n; i++) { 568 DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i]; 569 try { 570 datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); 571 } catch (Exception e) { 572 // exclude the broken DN next time 573 toExcludeNodes.add(datanodeInfo); 574 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error"); 575 throw e; 576 } 577 } 578 Encryptor encryptor = createEncryptor(conf, stat, client); 579 FanOutOneBlockAsyncDFSOutput output = 580 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat, 581 locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); 582 succ = true; 583 return output; 584 } catch (RemoteException e) { 585 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 586 if (shouldRetryCreate(e)) { 587 if (retry >= createMaxRetries) { 588 throw e.unwrapRemoteException(); 589 } 590 } else { 591 throw e.unwrapRemoteException(); 592 } 593 } catch (IOException e) { 594 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 595 if (retry >= createMaxRetries) { 596 throw e; 597 } 598 // overwrite the old broken file. 599 overwrite = true; 600 try { 601 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 602 } catch (InterruptedException ie) { 603 throw new InterruptedIOException(); 604 } 605 } finally { 606 if (!succ) { 607 if (futureList != null) { 608 for (Future<Channel> f : futureList) { 609 addListener(f, new FutureListener<Channel>() { 610 611 @Override 612 public void operationComplete(Future<Channel> future) throws Exception { 613 if (future.isSuccess()) { 614 safeClose(future.getNow()); 615 } 616 } 617 }); 618 } 619 } 620 endFileLease(client, stat); 621 } 622 } 623 } 624 } 625 626 /** 627 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 628 * inside an {@link EventLoop}. 629 */ 630 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 631 boolean overwrite, boolean createParent, short replication, long blockSize, 632 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 633 final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { 634 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 635 636 @Override 637 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 638 throws IOException, UnresolvedLinkException { 639 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 640 blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); 641 } 642 643 @Override 644 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 645 throw new UnsupportedOperationException(); 646 } 647 }.resolve(dfs, f); 648 } 649 650 public static boolean shouldRetryCreate(RemoteException e) { 651 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 652 // For exceptions other than this, we just throw it out. This is same with 653 // DFSOutputStream.newStreamForCreate. 654 return e.getClassName().endsWith("RetryStartFileException"); 655 } 656 657 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 658 ExtendedBlock block, HdfsFileStatus stat) { 659 for (int retry = 0;; retry++) { 660 try { 661 if (namenode.complete(src, clientName, block, stat.getFileId())) { 662 endFileLease(client, stat); 663 return; 664 } else { 665 LOG.warn("complete file " + src + " not finished, retry = " + retry); 666 } 667 } catch (RemoteException e) { 668 IOException ioe = e.unwrapRemoteException(); 669 if (ioe instanceof LeaseExpiredException) { 670 LOG.warn("lease for file " + src + " is expired, give up", e); 671 return; 672 } else { 673 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 674 } 675 } catch (Exception e) { 676 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 677 } 678 sleepIgnoreInterrupt(retry); 679 } 680 } 681 682 static void sleepIgnoreInterrupt(int retry) { 683 try { 684 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 685 } catch (InterruptedException e) { 686 } 687 } 688 689 public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) { 690 if (datanodeInfos.isEmpty()) { 691 return "[]"; 692 } 693 return datanodeInfos.stream() 694 .map(datanodeInfo -> new StringBuilder().append("(").append(datanodeInfo.getHostName()) 695 .append("/").append(datanodeInfo.getInfoAddr()).append(":") 696 .append(datanodeInfo.getInfoPort()).append(")").toString()) 697 .collect(Collectors.joining(",", "[", "]")); 698 } 699}