001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.consume; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 022import static org.hamcrest.CoreMatchers.instanceOf; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertArrayEquals; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.fail; 027 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.lang.reflect.InvocationTargetException; 032import java.lang.reflect.Method; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.MiscTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hdfs.DistributedFileSystem; 049import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; 050import org.apache.hadoop.hdfs.server.datanode.DataNode; 051import org.apache.hadoop.ipc.RemoteException; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.FixMethodOrder; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.junit.runners.MethodSorters; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.io.netty.channel.Channel; 065import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 066import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 067import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 068import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 069 070@FixMethodOrder(MethodSorters.NAME_ASCENDING) 071@Category({ MiscTests.class, MediumTests.class }) 072public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); 079 private static DistributedFileSystem FS; 080 private static EventLoopGroup EVENT_LOOP_GROUP; 081 private static Class<? extends Channel> CHANNEL_CLASS; 082 private static int READ_TIMEOUT_MS = 2000; 083 084 private static StreamSlowMonitor MONITOR; 085 086 @Rule 087 public TestName name = new TestName(); 088 089 @BeforeClass 090 public static void setUp() throws Exception { 091 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 092 startMiniDFSCluster(3); 093 FS = CLUSTER.getFileSystem(); 094 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 095 CHANNEL_CLASS = NioSocketChannel.class; 096 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 097 } 098 099 @AfterClass 100 public static void tearDown() throws Exception { 101 if (EVENT_LOOP_GROUP != null) { 102 EVENT_LOOP_GROUP.shutdownGracefully().get(); 103 } 104 shutdownMiniDFSCluster(); 105 } 106 107 private static final Random RNG = new Random(); // This test depends on Random#setSeed 108 109 static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) 110 throws IOException, InterruptedException, ExecutionException { 111 List<CompletableFuture<Long>> futures = new ArrayList<>(); 112 byte[] b = new byte[10]; 113 // test pipelined flush 114 RNG.setSeed(12345); 115 for (int i = 0; i < 10; i++) { 116 RNG.nextBytes(b); 117 out.write(b); 118 futures.add(out.flush(false)); 119 futures.add(out.flush(false)); 120 } 121 for (int i = 0; i < 10; i++) { 122 assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue()); 123 assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); 124 } 125 out.close(); 126 assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); 127 byte[] actual = new byte[b.length]; 128 RNG.setSeed(12345); 129 try (FSDataInputStream in = fs.open(f)) { 130 for (int i = 0; i < 10; i++) { 131 in.readFully(actual); 132 RNG.nextBytes(b); 133 assertArrayEquals(b, actual); 134 } 135 assertEquals(-1, in.read()); 136 } 137 } 138 139 @Test 140 public void test() throws IOException, InterruptedException, ExecutionException { 141 Path f = new Path("/" + name.getMethodName()); 142 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 143 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 144 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 145 writeAndVerify(FS, f, out); 146 } 147 148 /** 149 * Test method has been renamed to be the first in NAME_ASCENDING. It's an ugly hack to avoid 150 * flakiness. 151 */ 152 @Test 153 public void test0Recover() throws IOException, InterruptedException, ExecutionException { 154 Path f = new Path("/" + name.getMethodName()); 155 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 156 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 157 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 158 byte[] b = new byte[10]; 159 Bytes.random(b); 160 out.write(b, 0, b.length); 161 out.flush(false).get(); 162 // restart one datanode which causes one connection broken 163 CLUSTER.restartDataNode(0); 164 out.write(b, 0, b.length); 165 try { 166 out.flush(false).get(); 167 fail("flush should fail"); 168 } catch (ExecutionException e) { 169 // we restarted one datanode so the flush should fail 170 LOG.info("expected exception caught", e); 171 } 172 out.recoverAndClose(null); 173 assertEquals(b.length, FS.getFileStatus(f).getLen()); 174 byte[] actual = new byte[b.length]; 175 try (FSDataInputStream in = FS.open(f)) { 176 in.readFully(actual); 177 } 178 assertArrayEquals(b, actual); 179 } 180 181 @Test 182 public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { 183 Path f = new Path("/" + name.getMethodName()); 184 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 185 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 186 false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 187 Thread.sleep(READ_TIMEOUT_MS * 2); 188 // the connection to datanode should still alive. 189 writeAndVerify(FS, f, out); 190 } 191 192 /** 193 * This is important for fencing when recover from RS crash. 194 */ 195 @Test 196 public void testCreateParentFailed() throws IOException { 197 Path f = new Path("/" + name.getMethodName() + "/test"); 198 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 199 try { 200 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 201 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 202 fail("should fail with parent does not exist"); 203 } catch (RemoteException e) { 204 LOG.info("expected exception caught", e); 205 assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); 206 } 207 } 208 209 @Test 210 public void testConnectToDatanodeFailed() 211 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 212 InvocationTargetException, InterruptedException, NoSuchFieldException { 213 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 214 xceiverServerDaemonField.setAccessible(true); 215 Class<?> xceiverServerClass = 216 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 217 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 218 numPeersMethod.setAccessible(true); 219 // make one datanode broken 220 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 221 Path f = new Path("/test"); 222 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 223 try (FanOutOneBlockAsyncDFSOutput output = 224 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 225 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { 226 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 227 assertEquals(2, output.getPipeline().length); 228 } finally { 229 CLUSTER.restartDataNode(dnProp); 230 CLUSTER.triggerBlockReports(); 231 } 232 } 233 234 @Test 235 public void testExcludeFailedConnectToDatanode() 236 throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 237 InvocationTargetException, InterruptedException, NoSuchFieldException { 238 Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); 239 xceiverServerDaemonField.setAccessible(true); 240 Class<?> xceiverServerClass = 241 Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); 242 Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); 243 numPeersMethod.setAccessible(true); 244 // make one datanode broken 245 DataNodeProperties dnProp = CLUSTER.stopDataNode(0); 246 Path f = new Path("/test"); 247 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 248 ExcludeDatanodeManager excludeDatanodeManager = 249 new ExcludeDatanodeManager(HBaseConfiguration.create()); 250 StreamSlowMonitor streamSlowDNsMonitor = 251 excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); 252 assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); 253 try (FanOutOneBlockAsyncDFSOutput output = 254 FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, 255 FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { 256 // should exclude the dead dn when retry so here we only have 2 DNs in pipeline 257 assertEquals(2, output.getPipeline().length); 258 assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); 259 } finally { 260 CLUSTER.restartDataNode(dnProp); 261 CLUSTER.triggerBlockReports(); 262 } 263 } 264 265 @Test 266 public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { 267 Path f = new Path("/" + name.getMethodName()); 268 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 269 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, 270 false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); 271 byte[] b = new byte[50 * 1024 * 1024]; 272 Bytes.random(b); 273 out.write(b); 274 consume(out.flush(false)); 275 assertEquals(b.length, out.flush(false).get().longValue()); 276 out.close(); 277 assertEquals(b.length, FS.getFileStatus(f).getLen()); 278 byte[] actual = new byte[b.length]; 279 try (FSDataInputStream in = FS.open(f)) { 280 in.readFully(actual); 281 } 282 assertArrayEquals(b, actual); 283 } 284}