001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.lang.reflect.Field; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.BlockLocation; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.MiniHBaseCluster; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.master.LoadBalancer; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.testclassification.MiscTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hdfs.DFSClient; 050import org.apache.hadoop.hdfs.DistributedFileSystem; 051import org.apache.hadoop.hdfs.MiniDFSCluster; 052import org.apache.hadoop.hdfs.protocol.ClientProtocol; 053import org.apache.hadoop.hdfs.protocol.DirectoryListing; 054import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 055import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 056import org.apache.hadoop.ipc.RemoteException; 057import org.junit.After; 058import org.junit.Assert; 059import org.junit.Before; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * Tests for the hdfs fix from HBASE-6435. Please don't add new subtest which involves starting / 070 * stopping MiniDFSCluster in this class. When stopping MiniDFSCluster, shutdown hooks would be 071 * cleared in hadoop's ShutdownHookManager in hadoop 3. This leads to 'Failed suppression of fs 072 * shutdown hook' error in region server. 073 */ 074@Category({ MiscTests.class, LargeTests.class }) 075public class TestBlockReorderMultiBlocks { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestBlockReorderMultiBlocks.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class); 082 083 private Configuration conf; 084 private MiniDFSCluster cluster; 085 private HBaseTestingUtility htu; 086 private DistributedFileSystem dfs; 087 private static final String host1 = "host1"; 088 private static final String host2 = "host2"; 089 private static final String host3 = "host3"; 090 091 @Rule 092 public TestName name = new TestName(); 093 094 @Before 095 public void setUp() throws Exception { 096 htu = new HBaseTestingUtility(); 097 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 098 htu.getConfiguration().setInt("dfs.replication", 3); 099 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 100 new String[] { host1, host2, host3 }); 101 102 conf = htu.getConfiguration(); 103 cluster = htu.getDFSCluster(); 104 dfs = (DistributedFileSystem) FileSystem.get(conf); 105 } 106 107 @After 108 public void tearDownAfterClass() throws Exception { 109 htu.shutdownMiniCluster(); 110 } 111 112 /** 113 * Test that the hook works within HBase, including when there are multiple blocks. 114 */ 115 @Test() 116 public void testHBaseCluster() throws Exception { 117 byte[] sb = Bytes.toBytes("sb"); 118 htu.startMiniZKCluster(); 119 120 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(); 121 hbm.waitForActiveAndReadyMaster(); 122 HRegionServer targetRs = 123 LoadBalancer.isTablesOnMaster(hbm.getConf()) ? hbm.getMaster() : hbm.getRegionServer(0); 124 125 // We want to have a datanode with the same name as the region server, so 126 // we're going to get the regionservername, and start a new datanode with this name. 127 String host4 = targetRs.getServerName().getHostname(); 128 LOG.info("Starting a new datanode with the name=" + host4); 129 cluster.startDataNodes(conf, 1, true, null, new String[] { "/r4" }, new String[] { host4 }, 130 null); 131 cluster.waitClusterUp(); 132 133 final int repCount = 3; 134 135 // We use the regionserver file system & conf as we expect it to have the hook. 136 conf = targetRs.getConfiguration(); 137 HFileSystem rfs = (HFileSystem) targetRs.getFileSystem(); 138 Table h = htu.createTable(TableName.valueOf(name.getMethodName()), sb); 139 140 // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode 141 // with the same node will be used. We can't really stop an existing datanode, this would 142 // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times. 143 144 // Now we need to find the log file, its locations, and look at it 145 146 String rootDir = 147 new Path(CommonFSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" 148 + targetRs.getServerName().toString()).toUri().getPath(); 149 150 DistributedFileSystem mdfs = 151 (DistributedFileSystem) hbm.getMaster().getMasterFileSystem().getFileSystem(); 152 153 int nbTest = 0; 154 while (nbTest < 10) { 155 final List<HRegion> regions = targetRs.getRegions(h.getName()); 156 final CountDownLatch latch = new CountDownLatch(regions.size()); 157 // listen for successful log rolls 158 final WALActionsListener listener = new WALActionsListener() { 159 @Override 160 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 161 latch.countDown(); 162 } 163 }; 164 for (HRegion region : regions) { 165 region.getWAL().registerWALActionsListener(listener); 166 } 167 168 htu.getAdmin().rollWALWriter(targetRs.getServerName()); 169 170 // wait 171 try { 172 latch.await(); 173 } catch (InterruptedException exception) { 174 LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " 175 + "tests fail, it's probably because we should still be waiting."); 176 Thread.currentThread().interrupt(); 177 } 178 for (Region region : regions) { 179 ((HRegion) region).getWAL().unregisterWALActionsListener(listener); 180 } 181 182 // We need a sleep as the namenode is informed asynchronously 183 Thread.sleep(100); 184 185 // insert one put to ensure a minimal size 186 Put p = new Put(sb); 187 p.addColumn(sb, sb, sb); 188 h.put(p); 189 190 DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME); 191 HdfsFileStatus[] hfs = dl.getPartialListing(); 192 193 // As we wrote a put, we should have at least one log file. 194 Assert.assertTrue(hfs.length >= 1); 195 for (HdfsFileStatus hf : hfs) { 196 // Because this is a live cluster, log files might get archived while we're processing 197 try { 198 LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir); 199 String logFile = rootDir + "/" + hf.getLocalName(); 200 FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); 201 202 LOG.info("Checking log file: " + logFile); 203 // Now checking that the hook is up and running 204 // We can't call directly getBlockLocations, it's not available in HFileSystem 205 // We're trying multiple times to be sure, as the order is random 206 207 BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); 208 if (bls.length > 0) { 209 BlockLocation bl = bls[0]; 210 211 LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " "); 212 for (int i = 0; i < bl.getHosts().length - 1; i++) { 213 LOG.info(bl.getHosts()[i] + " " + logFile); 214 Assert.assertNotSame(bl.getHosts()[i], host4); 215 } 216 String last = bl.getHosts()[bl.getHosts().length - 1]; 217 LOG.info(last + " " + logFile); 218 if (host4.equals(last)) { 219 nbTest++; 220 LOG.info(logFile + " is on the new datanode and is ok"); 221 if (bl.getHosts().length == 3) { 222 // We can test this case from the file system as well 223 // Checking the underlying file system. Multiple times as the order is random 224 testFromDFS(dfs, logFile, repCount, host4); 225 226 // now from the master 227 testFromDFS(mdfs, logFile, repCount, host4); 228 } 229 } 230 } 231 } catch (FileNotFoundException exception) { 232 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " 233 + "archived out from under us so we'll ignore and retry. If this test hangs " 234 + "indefinitely you should treat this failure as a symptom.", exception); 235 } catch (RemoteException exception) { 236 if (exception.unwrapRemoteException() instanceof FileNotFoundException) { 237 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " 238 + "archived out from under us so we'll ignore and retry. If this test hangs " 239 + "indefinitely you should treat this failure as a symptom.", exception); 240 } else { 241 throw exception; 242 } 243 } 244 } 245 } 246 } 247 248 private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) 249 throws Exception { 250 // Multiple times as the order is random 251 for (int i = 0; i < 10; i++) { 252 LocatedBlocks lbs; 253 // The NN gets the block list asynchronously, so we may need multiple tries to get the list 254 final long max = EnvironmentEdgeManager.currentTime() + 10000; 255 boolean done; 256 do { 257 Assert.assertTrue("Can't get enouth replica", EnvironmentEdgeManager.currentTime() < max); 258 lbs = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1); 259 Assert.assertNotNull("Can't get block locations for " + src, lbs); 260 Assert.assertNotNull(lbs.getLocatedBlocks()); 261 Assert.assertTrue(lbs.getLocatedBlocks().size() > 0); 262 263 done = true; 264 for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) { 265 done = getLocatedBlockLocations(lbs.get(y)).length == repCount; 266 } 267 } while (!done); 268 269 for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) { 270 Assert.assertEquals(localhost, 271 getLocatedBlockLocations(lbs.get(y))[repCount - 1].getHostName()); 272 } 273 } 274 } 275 276 private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception { 277 Field nf = DFSClient.class.getDeclaredField("namenode"); 278 nf.setAccessible(true); 279 return (ClientProtocol) nf.get(dfsc); 280 } 281 282}