001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.ExecutionException; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.MiscTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 038import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 039import org.apache.hadoop.hdfs.server.datanode.DataNode; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 051import org.apache.hbase.thirdparty.io.netty.channel.Channel; 052import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 053import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; 054import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 055import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 056import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 057import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 058 059/** 060 * Testcase for HBASE-26679, here we introduce a separate test class and not put the testcase in 061 * {@link TestFanOutOneBlockAsyncDFSOutput} because we will send heartbeat to DN when there is no 062 * out going packet, the timeout is controlled by 063 * {@link TestFanOutOneBlockAsyncDFSOutput#READ_TIMEOUT_MS},which is 2 seconds, it will keep sending 064 * package out and DN will respond immedately and then mess up the testing handler added by us. So 065 * in this test class we use the default value for timeout which is 60 seconds and it is enough for 066 * this test. 067 */ 068@Category({ MiscTests.class, MediumTests.class }) 069public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutputHang.class); 074 075 private static final Logger LOG = 076 LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutputHang.class); 077 078 private static DistributedFileSystem FS; 079 080 private static EventLoopGroup EVENT_LOOP_GROUP; 081 082 private static Class<? extends Channel> CHANNEL_CLASS; 083 084 private static StreamSlowMonitor MONITOR; 085 086 private static FanOutOneBlockAsyncDFSOutput OUT; 087 088 @Rule 089 public TestName name = new TestName(); 090 091 @BeforeClass 092 public static void setUp() throws Exception { 093 startMiniDFSCluster(2); 094 FS = CLUSTER.getFileSystem(); 095 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 096 CHANNEL_CLASS = NioSocketChannel.class; 097 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 098 Path f = new Path("/testHang"); 099 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 100 OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, 101 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 102 } 103 104 @AfterClass 105 public static void tearDown() throws Exception { 106 if (OUT != null) { 107 OUT.recoverAndClose(null); 108 } 109 if (EVENT_LOOP_GROUP != null) { 110 EVENT_LOOP_GROUP.shutdownGracefully().get(); 111 } 112 shutdownMiniDFSCluster(); 113 } 114 115 /** 116 * <pre> 117 * This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN. 118 * The threads sequence before HBASE-26679 is: 119 * 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one 120 * {@link FanOutOneBlockAsyncDFSOutput.Callback} in 121 * {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}. 122 * 2.The ack from dn1 arrives firstly and triggers Netty to invoke 123 * {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in 124 * {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from 125 * {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}. 126 * 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception, 127 * so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel, 128 * and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not 129 * contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in 130 * {@link FanOutOneBlockAsyncDFSOutput#failed} method,and 131 * {@link FanOutOneBlockAsyncDFSOutput#state} is set to 132 * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of 133 * {@link FanOutOneBlockAsyncDFSOutput#failed}. 134 * 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed, 135 * but because {@link FanOutOneBlockAsyncDFSOutput#state} is already 136 * {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole 137 * {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future 138 * returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever. 139 * After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state} 140 * is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger 141 * {@link FanOutOneBlockAsyncDFSOutput.Callback#future}. 142 * </pre> 143 */ 144 @Test 145 public void testFlushHangWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception { 146 147 DataNodeProperties firstDataNodeProperties = null; 148 try { 149 150 final CyclicBarrier dn1AckReceivedCyclicBarrier = new CyclicBarrier(2); 151 Map<Channel, DatanodeInfo> datanodeInfoMap = OUT.getDatanodeInfoMap(); 152 Iterator<Map.Entry<Channel, DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator(); 153 assertTrue(iterator.hasNext()); 154 Map.Entry<Channel, DatanodeInfo> dn1Entry = iterator.next(); 155 Channel dn1Channel = dn1Entry.getKey(); 156 DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue(); 157 final List<String> protobufDecoderNames = new ArrayList<String>(); 158 dn1Channel.pipeline().forEach((entry) -> { 159 if (ProtobufDecoder.class.isInstance(entry.getValue())) { 160 protobufDecoderNames.add(entry.getKey()); 161 } 162 }); 163 assertTrue(protobufDecoderNames.size() == 1); 164 dn1Channel.pipeline().addAfter(protobufDecoderNames.get(0), "dn1AckReceivedHandler", 165 new ChannelInboundHandlerAdapter() { 166 @Override 167 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 168 super.channelRead(ctx, msg); 169 dn1AckReceivedCyclicBarrier.await(); 170 } 171 }); 172 173 assertTrue(iterator.hasNext()); 174 Map.Entry<Channel, DatanodeInfo> dn2Entry = iterator.next(); 175 Channel dn2Channel = dn2Entry.getKey(); 176 177 /** 178 * Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a 179 * slow dn2. 180 */ 181 dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() { 182 183 @Override 184 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 185 if (!(msg instanceof ByteBuf)) { 186 ctx.fireChannelRead(msg); 187 } else { 188 ((ByteBuf) msg).release(); 189 } 190 } 191 }); 192 193 byte[] b = new byte[10]; 194 Bytes.random(b); 195 OUT.write(b, 0, b.length); 196 CompletableFuture<Long> future = OUT.flush(false); 197 /** 198 * Wait for ack from dn1. 199 */ 200 dn1AckReceivedCyclicBarrier.await(); 201 /** 202 * First ack is received from dn1,we could stop dn1 now. 203 */ 204 firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo); 205 assertTrue(firstDataNodeProperties != null); 206 try { 207 /** 208 * Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with 209 * {@link ExecutionException}. 210 */ 211 future.get(); 212 fail(); 213 } catch (ExecutionException e) { 214 assertTrue(e != null); 215 LOG.info("expected exception caught when get future", e); 216 } 217 /** 218 * Make sure all the data node channel are closed. 219 */ 220 datanodeInfoMap.keySet().forEach(ch -> { 221 try { 222 ch.closeFuture().get(); 223 } catch (InterruptedException | ExecutionException e) { 224 throw new RuntimeException(e); 225 } 226 }); 227 } finally { 228 if (firstDataNodeProperties != null) { 229 CLUSTER.restartDataNode(firstDataNodeProperties); 230 } 231 } 232 } 233 234 private static DataNodeProperties findAndKillFirstDataNode(DatanodeInfo firstDatanodeInfo) { 235 assertTrue(firstDatanodeInfo != null); 236 ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes(); 237 ArrayList<Integer> foundIndexes = new ArrayList<Integer>(); 238 int index = 0; 239 for (DataNode dataNode : dataNodes) { 240 if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) { 241 foundIndexes.add(index); 242 } 243 index++; 244 } 245 assertTrue(foundIndexes.size() == 1); 246 return CLUSTER.stopDataNode(foundIndexes.get(0)); 247 } 248 249}