001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; 021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; 022import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 023import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener; 024import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose; 025import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush; 026import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 027import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; 028import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; 029import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; 030import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; 031import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 032 033import com.google.protobuf.CodedOutputStream; 034import java.io.IOException; 035import java.io.InterruptedIOException; 036import java.lang.reflect.InvocationTargetException; 037import java.lang.reflect.Method; 038import java.util.ArrayList; 039import java.util.EnumSet; 040import java.util.HashSet; 041import java.util.IdentityHashMap; 042import java.util.List; 043import java.util.Map; 044import java.util.Set; 045import java.util.concurrent.TimeUnit; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.crypto.CryptoProtocolVersion; 048import org.apache.hadoop.crypto.Encryptor; 049import org.apache.hadoop.fs.CreateFlag; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.FileSystemLinkResolver; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.fs.StorageType; 054import org.apache.hadoop.fs.UnresolvedLinkException; 055import org.apache.hadoop.fs.permission.FsPermission; 056import org.apache.hadoop.hbase.client.ConnectionUtils; 057import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 058import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 059import org.apache.hadoop.hbase.util.CancelableProgressable; 060import org.apache.hadoop.hdfs.DFSClient; 061import org.apache.hadoop.hdfs.DFSOutputStream; 062import org.apache.hadoop.hdfs.DistributedFileSystem; 063import org.apache.hadoop.hdfs.protocol.ClientProtocol; 064import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 065import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 066import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 067import org.apache.hadoop.hdfs.protocol.LocatedBlock; 068import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 069import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 070import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 071import org.apache.hadoop.hdfs.protocol.datatransfer.Op; 072import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 073import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN; 074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; 075import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 076import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 077import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 078import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 079import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 080import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 081import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 082import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 083import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 084import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 085import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 086import org.apache.hadoop.io.EnumSetWritable; 087import org.apache.hadoop.ipc.RemoteException; 088import org.apache.hadoop.net.NetUtils; 089import org.apache.hadoop.security.token.Token; 090import org.apache.hadoop.util.DataChecksum; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 096import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 097import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 098import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 099import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 100import org.apache.hbase.thirdparty.io.netty.channel.Channel; 101import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 102import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 103import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; 104import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 105import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 106import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 107import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 108import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 109import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 110import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 111import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 112import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 113import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 114import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 115import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 116 117/** 118 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. 119 */ 120@InterfaceAudience.Private 121public final class FanOutOneBlockAsyncDFSOutputHelper { 122 private static final Logger LOG = 123 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class); 124 125 private FanOutOneBlockAsyncDFSOutputHelper() { 126 } 127 128 public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries"; 129 130 public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10; 131 // use pooled allocator for performance. 132 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; 133 134 // copied from DFSPacket since it is package private. 135 public static final long HEART_BEAT_SEQNO = -1L; 136 137 // Timeouts for communicating with DataNode for streaming writes/reads 138 public static final int READ_TIMEOUT = 60 * 1000; 139 140 private interface LeaseManager { 141 142 void begin(DFSClient client, long inodeId); 143 144 void end(DFSClient client, long inodeId); 145 } 146 147 private static final LeaseManager LEASE_MANAGER; 148 149 // This is used to terminate a recoverFileLease call when FileSystem is already closed. 150 // isClientRunning is not public so we need to use reflection. 151 private interface DFSClientAdaptor { 152 153 boolean isClientRunning(DFSClient client); 154 } 155 156 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; 157 158 // helper class for creating files. 159 private interface FileCreator { 160 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, 161 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, 162 long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception { 163 try { 164 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, 165 replication, blockSize, supportedVersions); 166 } catch (InvocationTargetException e) { 167 if (e.getCause() instanceof Exception) { 168 throw (Exception) e.getCause(); 169 } else { 170 throw new RuntimeException(e.getCause()); 171 } 172 } 173 }; 174 175 Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName, 176 EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, 177 CryptoProtocolVersion[] supportedVersions) throws Exception; 178 } 179 180 private static final FileCreator FILE_CREATOR; 181 182 // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but 183 // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to 184 // indirectly reference it through reflection. 185 private static final CreateFlag SHOULD_REPLICATE_FLAG; 186 187 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { 188 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); 189 isClientRunningMethod.setAccessible(true); 190 return new DFSClientAdaptor() { 191 192 @Override 193 public boolean isClientRunning(DFSClient client) { 194 try { 195 return (Boolean) isClientRunningMethod.invoke(client); 196 } catch (IllegalAccessException | InvocationTargetException e) { 197 throw new RuntimeException(e); 198 } 199 } 200 }; 201 } 202 203 private static LeaseManager createLeaseManager() throws NoSuchMethodException { 204 Method beginFileLeaseMethod = 205 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); 206 beginFileLeaseMethod.setAccessible(true); 207 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); 208 endFileLeaseMethod.setAccessible(true); 209 return new LeaseManager() { 210 211 @Override 212 public void begin(DFSClient client, long inodeId) { 213 try { 214 beginFileLeaseMethod.invoke(client, inodeId, null); 215 } catch (IllegalAccessException | InvocationTargetException e) { 216 throw new RuntimeException(e); 217 } 218 } 219 220 @Override 221 public void end(DFSClient client, long inodeId) { 222 try { 223 endFileLeaseMethod.invoke(client, inodeId); 224 } catch (IllegalAccessException | InvocationTargetException e) { 225 throw new RuntimeException(e); 226 } 227 } 228 }; 229 } 230 231 private static FileCreator createFileCreator3_3() throws NoSuchMethodException { 232 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 233 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 234 CryptoProtocolVersion[].class, String.class, String.class); 235 236 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 237 supportedVersions) -> { 238 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 239 createParent, replication, blockSize, supportedVersions, null, null); 240 }; 241 } 242 243 private static FileCreator createFileCreator3() throws NoSuchMethodException { 244 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 245 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 246 CryptoProtocolVersion[].class, String.class); 247 248 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 249 supportedVersions) -> { 250 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 251 createParent, replication, blockSize, supportedVersions, null); 252 }; 253 } 254 255 private static FileCreator createFileCreator2() throws NoSuchMethodException { 256 Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, 257 String.class, EnumSetWritable.class, boolean.class, short.class, long.class, 258 CryptoProtocolVersion[].class); 259 260 return (instance, src, masked, clientName, flag, createParent, replication, blockSize, 261 supportedVersions) -> { 262 return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag, 263 createParent, replication, blockSize, supportedVersions); 264 }; 265 } 266 267 private static FileCreator createFileCreator() throws NoSuchMethodException { 268 try { 269 return createFileCreator3_3(); 270 } catch (NoSuchMethodException e) { 271 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below"); 272 } 273 274 try { 275 return createFileCreator3(); 276 } catch (NoSuchMethodException e) { 277 LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); 278 } 279 return createFileCreator2(); 280 } 281 282 private static CreateFlag loadShouldReplicateFlag() { 283 try { 284 return CreateFlag.valueOf("SHOULD_REPLICATE"); 285 } catch (IllegalArgumentException e) { 286 LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e); 287 return null; 288 } 289 } 290 291 // cancel the processing if DFSClient is already closed. 292 static final class CancelOnClose implements CancelableProgressable { 293 294 private final DFSClient client; 295 296 public CancelOnClose(DFSClient client) { 297 this.client = client; 298 } 299 300 @Override 301 public boolean progress() { 302 return DFS_CLIENT_ADAPTOR.isClientRunning(client); 303 } 304 } 305 306 static { 307 try { 308 LEASE_MANAGER = createLeaseManager(); 309 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); 310 FILE_CREATOR = createFileCreator(); 311 SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag(); 312 } catch (Exception e) { 313 String msg = "Couldn't properly initialize access to HDFS internals. Please " 314 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 315 + "HBASE-16110 for more information."; 316 LOG.error(msg, e); 317 throw new Error(msg, e); 318 } 319 } 320 321 static void beginFileLease(DFSClient client, long inodeId) { 322 LEASE_MANAGER.begin(client, inodeId); 323 } 324 325 static void endFileLease(DFSClient client, long inodeId) { 326 LEASE_MANAGER.end(client, inodeId); 327 } 328 329 static DataChecksum createChecksum(DFSClient client) { 330 return client.getConf().createChecksum(null); 331 } 332 333 static Status getStatus(PipelineAckProto ack) { 334 List<Integer> flagList = ack.getFlagList(); 335 Integer headerFlag; 336 if (flagList.isEmpty()) { 337 Status reply = ack.getReply(0); 338 headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply); 339 } else { 340 headerFlag = flagList.get(0); 341 } 342 return PipelineAck.getStatusFromHeader(headerFlag); 343 } 344 345 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, 346 Promise<Channel> promise, int timeoutMs) { 347 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 348 new ProtobufVarint32FrameDecoder(), 349 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), 350 new SimpleChannelInboundHandler<BlockOpResponseProto>() { 351 352 @Override 353 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) 354 throws Exception { 355 Status pipelineStatus = resp.getStatus(); 356 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { 357 throw new IOException("datanode " + dnInfo + " is restarting"); 358 } 359 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); 360 if (resp.getStatus() != Status.SUCCESS) { 361 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { 362 throw new InvalidBlockTokenException("Got access token error" + ", status message " 363 + resp.getMessage() + ", " + logInfo); 364 } else { 365 throw new IOException("Got error" + ", status=" + resp.getStatus().name() 366 + ", status message " + resp.getMessage() + ", " + logInfo); 367 } 368 } 369 // success 370 ChannelPipeline p = ctx.pipeline(); 371 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { 372 // do not remove all handlers because we may have wrap or unwrap handlers at the header 373 // of pipeline. 374 if (handler instanceof IdleStateHandler) { 375 break; 376 } 377 } 378 // Disable auto read here. Enable it after we setup the streaming pipeline in 379 // FanOutOneBLockAsyncDFSOutput. 380 ctx.channel().config().setAutoRead(false); 381 promise.trySuccess(ctx.channel()); 382 } 383 384 @Override 385 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 386 promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); 387 } 388 389 @Override 390 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 391 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 392 promise 393 .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 394 } else { 395 super.userEventTriggered(ctx, evt); 396 } 397 } 398 399 @Override 400 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 401 promise.tryFailure(cause); 402 } 403 }); 404 } 405 406 private static void requestWriteBlock(Channel channel, StorageType storageType, 407 OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { 408 OpWriteBlockProto proto = 409 writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build(); 410 int protoLen = proto.getSerializedSize(); 411 ByteBuf buffer = 412 channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); 413 buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 414 buffer.writeByte(Op.WRITE_BLOCK.code); 415 proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); 416 safeWriteAndFlush(channel, buffer); 417 } 418 419 private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, 420 StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, 421 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise) 422 throws IOException { 423 Promise<Void> saslPromise = channel.eventLoop().newPromise(); 424 trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); 425 addListener(saslPromise, new FutureListener<Void>() { 426 427 @Override 428 public void operationComplete(Future<Void> future) throws Exception { 429 if (future.isSuccess()) { 430 // setup response processing pipeline first, then send request. 431 processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); 432 requestWriteBlock(channel, storageType, writeBlockProtoBuilder); 433 } else { 434 promise.tryFailure(future.cause()); 435 } 436 } 437 }); 438 } 439 440 private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, 441 String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, 442 BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, 443 Class<? extends Channel> channelClass) { 444 StorageType[] storageTypes = locatedBlock.getStorageTypes(); 445 DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock); 446 boolean connectToDnViaHostname = 447 conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); 448 int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); 449 ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); 450 blockCopy.setNumBytes(locatedBlock.getBlockSize()); 451 ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() 452 .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy)) 453 .setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))) 454 .setClientName(clientName).build(); 455 ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); 456 OpWriteBlockProto.Builder writeBlockProtoBuilder = 457 OpWriteBlockProto.newBuilder().setHeader(header) 458 .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())).setPipelineSize(1) 459 .setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd) 460 .setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto) 461 .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); 462 List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); 463 for (int i = 0; i < datanodeInfos.length; i++) { 464 DatanodeInfo dnInfo = datanodeInfos[i]; 465 StorageType storageType = storageTypes[i]; 466 Promise<Channel> promise = eventLoopGroup.next().newPromise(); 467 futureList.add(promise); 468 String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); 469 addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass) 470 .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { 471 472 @Override 473 protected void initChannel(Channel ch) throws Exception { 474 // we need to get the remote address of the channel so we can only move on after 475 // channel connected. Leave an empty implementation here because netty does not allow 476 // a null handler. 477 } 478 }).connect(NetUtils.createSocketAddr(dnAddr)), new ChannelFutureListener() { 479 480 @Override 481 public void operationComplete(ChannelFuture future) throws Exception { 482 if (future.isSuccess()) { 483 initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, 484 timeoutMs, client, locatedBlock.getBlockToken(), promise); 485 } else { 486 promise.tryFailure(future.cause()); 487 } 488 } 489 }); 490 } 491 return futureList; 492 } 493 494 /** 495 * Exception other than RemoteException thrown when calling create on namenode 496 */ 497 public static class NameNodeException extends IOException { 498 499 private static final long serialVersionUID = 3143237406477095390L; 500 501 public NameNodeException(Throwable cause) { 502 super(cause); 503 } 504 } 505 506 private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite, 507 boolean noLocalWrite) { 508 List<CreateFlag> flags = new ArrayList<>(); 509 flags.add(CreateFlag.CREATE); 510 if (overwrite) { 511 flags.add(CreateFlag.OVERWRITE); 512 } 513 if (SHOULD_REPLICATE_FLAG != null) { 514 flags.add(SHOULD_REPLICATE_FLAG); 515 } 516 if (noLocalWrite) { 517 flags.add(CreateFlag.NO_LOCAL_WRITE); 518 } 519 return new EnumSetWritable<>(EnumSet.copyOf(flags)); 520 } 521 522 private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, 523 boolean overwrite, boolean createParent, short replication, long blockSize, 524 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor, 525 boolean noLocalWrite) throws IOException { 526 Configuration conf = dfs.getConf(); 527 DFSClient client = dfs.getClient(); 528 String clientName = client.getClientName(); 529 ClientProtocol namenode = client.getNamenode(); 530 int createMaxRetries = 531 conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); 532 ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); 533 Set<DatanodeInfo> toExcludeNodes = 534 new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); 535 for (int retry = 0;; retry++) { 536 LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, 537 toExcludeNodes, retry); 538 HdfsFileStatus stat; 539 try { 540 stat = FILE_CREATOR.create(namenode, src, 541 FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, 542 getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, 543 CryptoProtocolVersion.supported()); 544 } catch (Exception e) { 545 if (e instanceof RemoteException) { 546 throw (RemoteException) e; 547 } else { 548 throw new NameNodeException(e); 549 } 550 } 551 beginFileLease(client, stat.getFileId()); 552 boolean succ = false; 553 LocatedBlock locatedBlock = null; 554 List<Future<Channel>> futureList = null; 555 try { 556 DataChecksum summer = createChecksum(client); 557 locatedBlock = namenode.addBlock(src, client.getClientName(), null, 558 toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); 559 Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>(); 560 futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, 561 PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); 562 for (int i = 0, n = futureList.size(); i < n; i++) { 563 DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i]; 564 try { 565 datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); 566 } catch (Exception e) { 567 // exclude the broken DN next time 568 toExcludeNodes.add(datanodeInfo); 569 excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error"); 570 throw e; 571 } 572 } 573 Encryptor encryptor = createEncryptor(conf, stat, client); 574 FanOutOneBlockAsyncDFSOutput output = 575 new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, 576 stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); 577 succ = true; 578 return output; 579 } catch (RemoteException e) { 580 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 581 if (shouldRetryCreate(e)) { 582 if (retry >= createMaxRetries) { 583 throw e.unwrapRemoteException(); 584 } 585 } else { 586 throw e.unwrapRemoteException(); 587 } 588 } catch (IOException e) { 589 LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); 590 if (retry >= createMaxRetries) { 591 throw e; 592 } 593 // overwrite the old broken file. 594 overwrite = true; 595 try { 596 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 597 } catch (InterruptedException ie) { 598 throw new InterruptedIOException(); 599 } 600 } finally { 601 if (!succ) { 602 if (futureList != null) { 603 for (Future<Channel> f : futureList) { 604 addListener(f, new FutureListener<Channel>() { 605 606 @Override 607 public void operationComplete(Future<Channel> future) throws Exception { 608 if (future.isSuccess()) { 609 safeClose(future.getNow()); 610 } 611 } 612 }); 613 } 614 } 615 endFileLease(client, stat.getFileId()); 616 } 617 } 618 } 619 } 620 621 /** 622 * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it 623 * inside an {@link EventLoop}. 624 */ 625 public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, 626 boolean overwrite, boolean createParent, short replication, long blockSize, 627 EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 628 final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { 629 return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { 630 631 @Override 632 public FanOutOneBlockAsyncDFSOutput doCall(Path p) 633 throws IOException, UnresolvedLinkException { 634 return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, 635 blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); 636 } 637 638 @Override 639 public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { 640 throw new UnsupportedOperationException(); 641 } 642 }.resolve(dfs, f); 643 } 644 645 public static boolean shouldRetryCreate(RemoteException e) { 646 // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. 647 // For exceptions other than this, we just throw it out. This is same with 648 // DFSOutputStream.newStreamForCreate. 649 return e.getClassName().endsWith("RetryStartFileException"); 650 } 651 652 static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, 653 ExtendedBlock block, long fileId) { 654 for (int retry = 0;; retry++) { 655 try { 656 if (namenode.complete(src, clientName, block, fileId)) { 657 endFileLease(client, fileId); 658 return; 659 } else { 660 LOG.warn("complete file " + src + " not finished, retry = " + retry); 661 } 662 } catch (RemoteException e) { 663 IOException ioe = e.unwrapRemoteException(); 664 if (ioe instanceof LeaseExpiredException) { 665 LOG.warn("lease for file " + src + " is expired, give up", e); 666 return; 667 } else { 668 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 669 } 670 } catch (Exception e) { 671 LOG.warn("complete file " + src + " failed, retry = " + retry, e); 672 } 673 sleepIgnoreInterrupt(retry); 674 } 675 } 676 677 static void sleepIgnoreInterrupt(int retry) { 678 try { 679 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 680 } catch (InterruptedException e) { 681 } 682 } 683}