001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.net.BindException; 025import java.net.ServerSocket; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.BlockLocation; 028import org.apache.hadoop.fs.FSDataInputStream; 029import org.apache.hadoop.fs.FSDataOutputStream; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.testclassification.LargeTests; 036import org.apache.hadoop.hbase.testclassification.MiscTests; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hdfs.DistributedFileSystem; 039import org.apache.hadoop.hdfs.MiniDFSCluster; 040import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 041import org.apache.hadoop.hdfs.protocol.LocatedBlock; 042import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 043import org.apache.hadoop.hdfs.server.datanode.DataNode; 044import org.junit.After; 045import org.junit.Assert; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting / 057 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be 058 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs 059 * shutdown hook' error in region server. 060 */ 061@Category({ MiscTests.class, LargeTests.class }) 062public class TestBlockReorder { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestBlockReorder.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class); 069 070 private Configuration conf; 071 private MiniDFSCluster cluster; 072 private HBaseTestingUtil htu; 073 private DistributedFileSystem dfs; 074 private static final String host1 = "host1"; 075 private static final String host2 = "host2"; 076 private static final String host3 = "host3"; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 @Before 082 public void setUp() throws Exception { 083 htu = new HBaseTestingUtil(); 084 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 085 htu.getConfiguration().setInt("dfs.replication", 3); 086 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 087 new String[] { host1, host2, host3 }); 088 089 conf = htu.getConfiguration(); 090 cluster = htu.getDFSCluster(); 091 dfs = (DistributedFileSystem) FileSystem.get(conf); 092 } 093 094 @After 095 public void tearDownAfterClass() throws Exception { 096 htu.shutdownMiniCluster(); 097 } 098 099 /** 100 * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS. 101 */ 102 @Test 103 public void testBlockLocationReorder() throws Exception { 104 Path p = new Path("hello"); 105 106 Assert.assertTrue((short) cluster.getDataNodes().size() > 1); 107 final int repCount = 2; 108 109 // Let's write the file 110 FSDataOutputStream fop = dfs.create(p, (short) repCount); 111 final double toWrite = 875.5613; 112 fop.writeDouble(toWrite); 113 fop.close(); 114 115 // Let's check we can read it when everybody's there 116 long start = EnvironmentEdgeManager.currentTime(); 117 FSDataInputStream fin = dfs.open(p); 118 Assert.assertTrue(toWrite == fin.readDouble()); 119 long end = EnvironmentEdgeManager.currentTime(); 120 LOG.info("readtime= " + (end - start)); 121 fin.close(); 122 Assert.assertTrue((end - start) < 30 * 1000); 123 124 // Let's kill the first location. But actually the fist location returned will change 125 // The first thing to do is to get the location, then the port 126 FileStatus f = dfs.getFileStatus(p); 127 BlockLocation[] lbs; 128 do { 129 lbs = dfs.getFileBlockLocations(f, 0, 1); 130 } while (lbs.length != 1 && lbs[0].getLength() != repCount); 131 final String name = lbs[0].getNames()[0]; 132 Assert.assertTrue(name.indexOf(':') > 0); 133 String portS = name.substring(name.indexOf(':') + 1); 134 final int port = Integer.parseInt(portS); 135 LOG.info("port= " + port); 136 int ipcPort = -1; 137 138 // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need 139 // to iterate ourselves. 140 boolean ok = false; 141 final String lookup = lbs[0].getHosts()[0]; 142 StringBuilder sb = new StringBuilder(); 143 for (DataNode dn : cluster.getDataNodes()) { 144 final String dnName = getHostName(dn); 145 sb.append(dnName).append(' '); 146 if (lookup.equals(dnName)) { 147 ok = true; 148 LOG.info("killing datanode " + name + " / " + lookup); 149 ipcPort = dn.ipcServer.getListenerAddress().getPort(); 150 dn.shutdown(); 151 LOG.info("killed datanode " + name + " / " + lookup); 152 break; 153 } 154 } 155 Assert.assertTrue("didn't find the server to kill, was looking for " + lookup + " found " + sb, 156 ok); 157 LOG.info("ipc port= " + ipcPort); 158 159 // Add the hook, with an implementation checking that we don't use the port we've just killed. 160 Assert 161 .assertTrue(HFileSystem.addLocationsOrderInterceptor(conf, new HFileSystem.ReorderBlocks() { 162 @Override 163 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) { 164 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 165 if (getLocatedBlockLocations(lb).length > 1) { 166 DatanodeInfo[] infos = getLocatedBlockLocations(lb); 167 if (infos[0].getHostName().equals(lookup)) { 168 LOG.info("HFileSystem bad host, inverting"); 169 DatanodeInfo tmp = infos[0]; 170 infos[0] = infos[1]; 171 infos[1] = tmp; 172 } 173 } 174 } 175 } 176 })); 177 178 final int retries = 10; 179 ServerSocket ss = null; 180 ServerSocket ssI; 181 try { 182 ss = new ServerSocket(port);// We're taking the port to have a timeout issue later. 183 ssI = new ServerSocket(ipcPort); 184 } catch (BindException be) { 185 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort 186 + ", this means that the datanode has not closed the socket or" 187 + " someone else took it. It may happen, skipping this test for this time.", be); 188 if (ss != null) { 189 ss.close(); 190 } 191 return; 192 } 193 194 // Now it will fail with a timeout, unfortunately it does not always connect to the same box, 195 // so we try retries times; with the reorder it will never last more than a few milli seconds 196 for (int i = 0; i < retries; i++) { 197 start = EnvironmentEdgeManager.currentTime(); 198 fin = dfs.open(p); 199 Assert.assertTrue(toWrite == fin.readDouble()); 200 fin.close(); 201 end = EnvironmentEdgeManager.currentTime(); 202 LOG.info("HFileSystem readtime= " + (end - start)); 203 Assert.assertFalse("We took too much time to read", (end - start) > 60000); 204 } 205 206 ss.close(); 207 ssI.close(); 208 } 209 210 /** 211 * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) 212 */ 213 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { 214 Method m; 215 try { 216 m = DataNode.class.getMethod("getDisplayName"); 217 } catch (NoSuchMethodException e) { 218 try { 219 m = DataNode.class.getMethod("getHostName"); 220 } catch (NoSuchMethodException e1) { 221 throw new RuntimeException(e1); 222 } 223 } 224 225 String res = (String) m.invoke(dn); 226 if (res.contains(":")) { 227 return res.split(":")[0]; 228 } else { 229 return res; 230 } 231 } 232 233}