001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.net.URI; 030import java.net.URISyntaxException; 031import java.util.List; 032import java.util.Random; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.LocalFileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.StreamCapabilities; 041import org.apache.hadoop.fs.permission.FsPermission; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HDFSBlocksDistribution; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.exceptions.DeserializationException; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.MiscTests; 052import org.apache.hadoop.hdfs.DFSConfigKeys; 053import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 054import org.apache.hadoop.hdfs.DFSTestUtil; 055import org.apache.hadoop.hdfs.DistributedFileSystem; 056import org.apache.hadoop.hdfs.MiniDFSCluster; 057import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 058import org.junit.Assert; 059import org.junit.Before; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * Test {@link FSUtils}. 068 */ 069@Category({ MiscTests.class, MediumTests.class }) 070public class TestFSUtils { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestFSUtils.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class); 077 078 private HBaseTestingUtil htu; 079 private FileSystem fs; 080 private Configuration conf; 081 082 @Before 083 public void setUp() throws IOException { 084 htu = new HBaseTestingUtil(); 085 fs = htu.getTestFileSystem(); 086 conf = htu.getConfiguration(); 087 } 088 089 @Test 090 public void testIsHDFS() throws Exception { 091 assertFalse(CommonFSUtils.isHDFS(conf)); 092 MiniDFSCluster cluster = null; 093 try { 094 cluster = htu.startMiniDFSCluster(1); 095 assertTrue(CommonFSUtils.isHDFS(conf)); 096 assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem())); 097 FSUtils.checkDfsSafeMode(conf); 098 } finally { 099 if (cluster != null) { 100 cluster.shutdown(); 101 } 102 } 103 } 104 105 @Test 106 public void testLocalFileSystemSafeMode() throws Exception { 107 conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class); 108 assertFalse(CommonFSUtils.isHDFS(conf)); 109 assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf))); 110 FSUtils.checkDfsSafeMode(conf); 111 } 112 113 private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception { 114 FSDataOutputStream out = fs.create(file); 115 byte[] data = new byte[dataSize]; 116 out.write(data, 0, dataSize); 117 out.close(); 118 } 119 120 @Test 121 public void testComputeHDFSBlocksDistributionByInputStream() throws Exception { 122 testComputeHDFSBlocksDistribution((fs, testFile) -> { 123 try (FSDataInputStream open = fs.open(testFile)) { 124 assertTrue(open instanceof HdfsDataInputStream); 125 return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open); 126 } 127 }); 128 } 129 130 @Test 131 public void testComputeHDFSBlockDistribution() throws Exception { 132 testComputeHDFSBlocksDistribution((fs, testFile) -> { 133 FileStatus status = fs.getFileStatus(testFile); 134 return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 135 }); 136 } 137 138 @FunctionalInterface 139 interface HDFSBlockDistributionFunction { 140 HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException; 141 } 142 143 private void testComputeHDFSBlocksDistribution( 144 HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception { 145 final int DEFAULT_BLOCK_SIZE = 1024; 146 conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 147 MiniDFSCluster cluster = null; 148 Path testFile = null; 149 150 try { 151 // set up a cluster with 3 nodes 152 String hosts[] = new String[] { "host1", "host2", "host3" }; 153 cluster = htu.startMiniDFSCluster(hosts); 154 cluster.waitActive(); 155 FileSystem fs = cluster.getFileSystem(); 156 157 // create a file with two blocks 158 testFile = new Path("/test1.txt"); 159 WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE); 160 161 // given the default replication factor is 3, the same as the number of 162 // datanodes; the locality index for each host should be 100%, 163 // or getWeight for each host should be the same as getUniqueBlocksWeights 164 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 165 boolean ok; 166 do { 167 ok = true; 168 169 HDFSBlocksDistribution blocksDistribution = 170 fileToBlockDistribution.getForPath(fs, testFile); 171 172 long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 173 for (String host : hosts) { 174 long weight = blocksDistribution.getWeight(host); 175 ok = (ok && uniqueBlocksTotalWeight == weight); 176 } 177 } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime); 178 assertTrue(ok); 179 } finally { 180 htu.shutdownMiniDFSCluster(); 181 } 182 183 try { 184 // set up a cluster with 4 nodes 185 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 186 cluster = htu.startMiniDFSCluster(hosts); 187 cluster.waitActive(); 188 FileSystem fs = cluster.getFileSystem(); 189 190 // create a file with three blocks 191 testFile = new Path("/test2.txt"); 192 WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE); 193 194 // given the default replication factor is 3, we will have total of 9 195 // replica of blocks; thus the host with the highest weight should have 196 // weight == 3 * DEFAULT_BLOCK_SIZE 197 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 198 long weight; 199 long uniqueBlocksTotalWeight; 200 do { 201 HDFSBlocksDistribution blocksDistribution = 202 fileToBlockDistribution.getForPath(fs, testFile); 203 uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 204 205 String tophost = blocksDistribution.getTopHosts().get(0); 206 weight = blocksDistribution.getWeight(tophost); 207 208 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 209 } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime); 210 assertTrue(uniqueBlocksTotalWeight == weight); 211 212 } finally { 213 htu.shutdownMiniDFSCluster(); 214 } 215 216 try { 217 // set up a cluster with 4 nodes 218 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 219 cluster = htu.startMiniDFSCluster(hosts); 220 cluster.waitActive(); 221 FileSystem fs = cluster.getFileSystem(); 222 223 // create a file with one block 224 testFile = new Path("/test3.txt"); 225 WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); 226 227 // given the default replication factor is 3, we will have total of 3 228 // replica of blocks; thus there is one host without weight 229 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 230 HDFSBlocksDistribution blocksDistribution; 231 do { 232 blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); 233 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 234 } while ( 235 blocksDistribution.getTopHosts().size() != 3 236 && EnvironmentEdgeManager.currentTime() < maxTime 237 ); 238 assertEquals("Wrong number of hosts distributing blocks.", 3, 239 blocksDistribution.getTopHosts().size()); 240 } finally { 241 htu.shutdownMiniDFSCluster(); 242 } 243 } 244 245 private void writeVersionFile(Path versionFile, String version) throws IOException { 246 if (CommonFSUtils.isExists(fs, versionFile)) { 247 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 248 } 249 try (FSDataOutputStream s = fs.create(versionFile)) { 250 s.writeUTF(version); 251 } 252 assertTrue(fs.exists(versionFile)); 253 } 254 255 @Test 256 public void testVersion() throws DeserializationException, IOException { 257 final Path rootdir = htu.getDataTestDir(); 258 final FileSystem fs = rootdir.getFileSystem(conf); 259 assertNull(FSUtils.getVersion(fs, rootdir)); 260 // No meta dir so no complaint from checkVersion. 261 // Presumes it a new install. Will create version file. 262 FSUtils.checkVersion(fs, rootdir, true); 263 // Now remove the version file and create a metadir so checkVersion fails. 264 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 265 assertTrue(CommonFSUtils.isExists(fs, versionFile)); 266 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 267 Path metaRegionDir = 268 FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO); 269 FsPermission defaultPerms = 270 CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY); 271 CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false); 272 boolean thrown = false; 273 try { 274 FSUtils.checkVersion(fs, rootdir, true); 275 } catch (FileSystemVersionException e) { 276 thrown = true; 277 } 278 assertTrue("Expected FileSystemVersionException", thrown); 279 // Write out a good version file. See if we can read it in and convert. 280 String version = HConstants.FILE_SYSTEM_VERSION; 281 writeVersionFile(versionFile, version); 282 FileStatus[] status = fs.listStatus(versionFile); 283 assertNotNull(status); 284 assertTrue(status.length > 0); 285 String newVersion = FSUtils.getVersion(fs, rootdir); 286 assertEquals(version.length(), newVersion.length()); 287 assertEquals(version, newVersion); 288 // File will have been converted. Exercise the pb format 289 assertEquals(version, FSUtils.getVersion(fs, rootdir)); 290 FSUtils.checkVersion(fs, rootdir, true); 291 // Write an old version file. 292 String oldVersion = "1"; 293 writeVersionFile(versionFile, oldVersion); 294 newVersion = FSUtils.getVersion(fs, rootdir); 295 assertNotEquals(version, newVersion); 296 thrown = false; 297 try { 298 FSUtils.checkVersion(fs, rootdir, true); 299 } catch (FileSystemVersionException e) { 300 thrown = true; 301 } 302 assertTrue("Expected FileSystemVersionException", thrown); 303 } 304 305 @Test 306 public void testPermMask() throws Exception { 307 final Path rootdir = htu.getDataTestDir(); 308 final FileSystem fs = rootdir.getFileSystem(conf); 309 // default fs permission 310 FsPermission defaultFsPerm = 311 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 312 // 'hbase.data.umask.enable' is false. We will get default fs permission. 313 assertEquals(FsPermission.getFileDefault(), defaultFsPerm); 314 315 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 316 // first check that we don't crash if we don't have perms set 317 FsPermission defaultStartPerm = 318 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 319 // default 'hbase.data.umask'is 000, and this umask will be used when 320 // 'hbase.data.umask.enable' is true. 321 // Therefore we will not get the real fs default in this case. 322 // Instead we will get the starting point FULL_RWX_PERMISSIONS 323 assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm); 324 325 conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077"); 326 // now check that we get the right perms 327 FsPermission filePerm = 328 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 329 assertEquals(new FsPermission("700"), filePerm); 330 331 // then that the correct file is created 332 Path p = new Path("target" + File.separator + HBaseTestingUtil.getRandomUUID().toString()); 333 try { 334 FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); 335 out.close(); 336 FileStatus stat = fs.getFileStatus(p); 337 assertEquals(new FsPermission("700"), stat.getPermission()); 338 // and then cleanup 339 } finally { 340 fs.delete(p, true); 341 } 342 } 343 344 @Test 345 public void testDeleteAndExists() throws Exception { 346 final Path rootdir = htu.getDataTestDir(); 347 final FileSystem fs = rootdir.getFileSystem(conf); 348 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 349 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 350 // then that the correct file is created 351 String file = HBaseTestingUtil.getRandomUUID().toString(); 352 Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); 353 Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); 354 try { 355 FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); 356 out.close(); 357 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 358 // delete the file with recursion as false. Only the file will be deleted. 359 CommonFSUtils.delete(fs, p, false); 360 // Create another file 361 FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); 362 out1.close(); 363 // delete the file with recursion as false. Still the file only will be deleted 364 CommonFSUtils.delete(fs, p1, true); 365 assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1)); 366 // and then cleanup 367 } finally { 368 CommonFSUtils.delete(fs, p, true); 369 CommonFSUtils.delete(fs, p1, true); 370 } 371 } 372 373 @Test 374 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { 375 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 376 try { 377 assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), 378 new Path("definitely/doesn't/exist"), null)); 379 } finally { 380 cluster.shutdown(); 381 } 382 383 } 384 385 @Test 386 public void testRenameAndSetModifyTime() throws Exception { 387 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 388 assertTrue(CommonFSUtils.isHDFS(conf)); 389 390 FileSystem fs = FileSystem.get(conf); 391 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 392 393 String file = HBaseTestingUtil.getRandomUUID().toString(); 394 Path p = new Path(testDir, file); 395 396 FSDataOutputStream out = fs.create(p); 397 out.close(); 398 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 399 400 long expect = EnvironmentEdgeManager.currentTime() + 1000; 401 assertNotEquals(expect, fs.getFileStatus(p).getModificationTime()); 402 403 ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge(); 404 mockEnv.setValue(expect); 405 EnvironmentEdgeManager.injectEdge(mockEnv); 406 try { 407 String dstFile = HBaseTestingUtil.getRandomUUID().toString(); 408 Path dst = new Path(testDir, dstFile); 409 410 assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst)); 411 assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p)); 412 assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst)); 413 414 assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); 415 cluster.shutdown(); 416 } finally { 417 EnvironmentEdgeManager.reset(); 418 } 419 } 420 421 @Test 422 public void testSetStoragePolicyDefault() throws Exception { 423 verifyNoHDFSApiInvocationForDefaultPolicy(); 424 verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); 425 } 426 427 /** 428 * Note: currently the default policy is set to defer to HDFS and this case is to verify the 429 * logic, will need to remove the check if the default policy is changed 430 */ 431 private void verifyNoHDFSApiInvocationForDefaultPolicy() throws URISyntaxException, IOException { 432 FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); 433 testFs.initialize(new URI("hdfs://localhost/"), conf); 434 // There should be no exception thrown when setting to default storage policy, which indicates 435 // the HDFS API hasn't been called 436 try { 437 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), 438 HConstants.DEFAULT_WAL_STORAGE_POLICY, true); 439 } catch (IOException e) { 440 Assert.fail("Should have bypassed the FS API when setting default storage policy"); 441 } 442 // There should be exception thrown when given non-default storage policy, which indicates the 443 // HDFS API has been called 444 try { 445 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); 446 Assert.fail("Should have invoked the FS API but haven't"); 447 } catch (IOException e) { 448 // expected given an invalid path 449 } 450 } 451 452 class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { 453 @Override 454 public void setStoragePolicy(final Path src, final String policyName) throws IOException { 455 throw new IOException("The setStoragePolicy method is invoked"); 456 } 457 } 458 459 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ 460 @Test 461 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { 462 verifyFileInDirWithStoragePolicy("ALL_SSD"); 463 } 464 465 final String INVALID_STORAGE_POLICY = "1772"; 466 467 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ 468 @Test 469 public void testSetStoragePolicyInvalid() throws Exception { 470 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); 471 } 472 473 // Here instead of TestCommonFSUtils because we need a minicluster 474 private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { 475 conf.set(HConstants.WAL_STORAGE_POLICY, policy); 476 477 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 478 try { 479 assertTrue(CommonFSUtils.isHDFS(conf)); 480 481 FileSystem fs = FileSystem.get(conf); 482 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 483 fs.mkdirs(testDir); 484 485 String storagePolicy = 486 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 487 CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy); 488 489 String file = HBaseTestingUtil.getRandomUUID().toString(); 490 Path p = new Path(testDir, file); 491 WriteDataToHDFS(fs, p, 4096); 492 HFileSystem hfs = new HFileSystem(fs); 493 String policySet = hfs.getStoragePolicyName(p); 494 LOG.debug("The storage policy of path " + p + " is " + policySet); 495 if ( 496 policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY) 497 || policy.equals(INVALID_STORAGE_POLICY) 498 ) { 499 String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory()); 500 LOG.debug("The default hdfs storage policy (indicated by home path: " 501 + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy); 502 Assert.assertEquals(hdfsDefaultPolicy, policySet); 503 } else { 504 Assert.assertEquals(policy, policySet); 505 } 506 // will assert existence before deleting. 507 cleanupFile(fs, testDir); 508 } finally { 509 cluster.shutdown(); 510 } 511 } 512 513 /** 514 * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of 515 * preading with hedged reads enabled using code taken from hdfs TestPread. 516 */ 517 @Test 518 public void testDFSHedgedReadMetrics() throws Exception { 519 // Enable hedged reads and set it so the threshold is really low. 520 // Most of this test is taken from HDFS, from TestPread. 521 conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); 522 conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); 523 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); 524 conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); 525 // Set short retry timeouts so this test runs faster 526 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); 527 conf.setBoolean("dfs.datanode.transferTo.allowed", false); 528 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); 529 // Get the metrics. Should be empty. 530 DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); 531 assertEquals(0, metrics.getHedgedReadOps()); 532 FileSystem fileSys = cluster.getFileSystem(); 533 try { 534 Path p = new Path("preadtest.dat"); 535 // We need > 1 blocks to test out the hedged reads. 536 DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3, 537 seed); 538 pReadFile(fileSys, p); 539 cleanupFile(fileSys, p); 540 assertTrue(metrics.getHedgedReadOps() > 0); 541 } finally { 542 fileSys.close(); 543 cluster.shutdown(); 544 } 545 } 546 547 @Test 548 public void testCopyFilesParallel() throws Exception { 549 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 550 cluster.waitActive(); 551 FileSystem fs = cluster.getFileSystem(); 552 Path src = new Path("/src"); 553 fs.mkdirs(src); 554 for (int i = 0; i < 50; i++) { 555 WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024); 556 } 557 Path sub = new Path(src, "sub"); 558 fs.mkdirs(sub); 559 for (int i = 0; i < 50; i++) { 560 WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024); 561 } 562 Path dst = new Path("/dst"); 563 List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4); 564 565 assertEquals(102, allFiles.size()); 566 FileStatus[] list = fs.listStatus(dst); 567 assertEquals(51, list.length); 568 FileStatus[] sublist = fs.listStatus(new Path(dst, "sub")); 569 assertEquals(50, sublist.length); 570 } 571 572 // Below is taken from TestPread over in HDFS. 573 static final int blockSize = 4096; 574 static final long seed = 0xDEADBEEFL; 575 private Random rand = new Random(); // This test depends on Random#setSeed 576 577 private void pReadFile(FileSystem fileSys, Path name) throws IOException { 578 FSDataInputStream stm = fileSys.open(name); 579 byte[] expected = new byte[12 * blockSize]; 580 rand.setSeed(seed); 581 rand.nextBytes(expected); 582 // do a sanity check. Read first 4K bytes 583 byte[] actual = new byte[4096]; 584 stm.readFully(actual); 585 checkAndEraseData(actual, 0, expected, "Read Sanity Test"); 586 // now do a pread for the first 8K bytes 587 actual = new byte[8192]; 588 doPread(stm, 0L, actual, 0, 8192); 589 checkAndEraseData(actual, 0, expected, "Pread Test 1"); 590 // Now check to see if the normal read returns 4K-8K byte range 591 actual = new byte[4096]; 592 stm.readFully(actual); 593 checkAndEraseData(actual, 4096, expected, "Pread Test 2"); 594 // Now see if we can cross a single block boundary successfully 595 // read 4K bytes from blockSize - 2K offset 596 stm.readFully(blockSize - 2048, actual, 0, 4096); 597 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); 598 // now see if we can cross two block boundaries successfully 599 // read blockSize + 4K bytes from blockSize - 2K offset 600 actual = new byte[blockSize + 4096]; 601 stm.readFully(blockSize - 2048, actual); 602 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); 603 // now see if we can cross two block boundaries that are not cached 604 // read blockSize + 4K bytes from 10*blockSize - 2K offset 605 actual = new byte[blockSize + 4096]; 606 stm.readFully(10 * blockSize - 2048, actual); 607 checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); 608 // now check that even after all these preads, we can still read 609 // bytes 8K-12K 610 actual = new byte[4096]; 611 stm.readFully(actual); 612 checkAndEraseData(actual, 8192, expected, "Pread Test 6"); 613 // done 614 stm.close(); 615 // check block location caching 616 stm = fileSys.open(name); 617 stm.readFully(1, actual, 0, 4096); 618 stm.readFully(4 * blockSize, actual, 0, 4096); 619 stm.readFully(7 * blockSize, actual, 0, 4096); 620 actual = new byte[3 * 4096]; 621 stm.readFully(0 * blockSize, actual, 0, 3 * 4096); 622 checkAndEraseData(actual, 0, expected, "Pread Test 7"); 623 actual = new byte[8 * 4096]; 624 stm.readFully(3 * blockSize, actual, 0, 8 * 4096); 625 checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8"); 626 // read the tail 627 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2); 628 IOException res = null; 629 try { // read beyond the end of the file 630 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize); 631 } catch (IOException e) { 632 // should throw an exception 633 res = e; 634 } 635 assertTrue("Error reading beyond file boundary.", res != null); 636 637 stm.close(); 638 } 639 640 private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { 641 for (int idx = 0; idx < actual.length; idx++) { 642 assertEquals(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx] 643 + " actual " + actual[idx], actual[idx], expected[from + idx]); 644 actual[idx] = 0; 645 } 646 } 647 648 private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length) 649 throws IOException { 650 int nread = 0; 651 // long totalRead = 0; 652 // DFSInputStream dfstm = null; 653 654 /* 655 * Disable. This counts do not add up. Some issue in original hdfs tests? if 656 * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream) 657 * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); } 658 */ 659 660 while (nread < length) { 661 int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread); 662 assertTrue("Error in pread", nbytes > 0); 663 nread += nbytes; 664 } 665 666 /* 667 * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) { 668 * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <= 669 * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else { 670 * assertEquals("Expected read statistic to be incremented", length, dfstm 671 * .getReadStatistics().getTotalBytesRead() - totalRead); } } 672 */ 673 } 674 675 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 676 assertTrue(fileSys.exists(name)); 677 assertTrue(fileSys.delete(name, true)); 678 assertTrue(!fileSys.exists(name)); 679 } 680 681 static { 682 try { 683 Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 684 LOG.debug("Test thought StreamCapabilities class was present."); 685 } catch (ClassNotFoundException exception) { 686 LOG.debug("Test didn't think StreamCapabilities class was present."); 687 } 688 } 689 690 // Here instead of TestCommonFSUtils because we need a minicluster 691 @Test 692 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { 693 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 694 try (FileSystem filesystem = cluster.getFileSystem()) { 695 FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); 696 assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); 697 assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); 698 assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add.")); 699 } finally { 700 cluster.shutdown(); 701 } 702 } 703 704 private void testIsSameHdfs(int nnport) throws IOException { 705 Configuration conf = HBaseConfiguration.create(); 706 Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); 707 Path desPath = new Path("hdfs://127.0.0.1/"); 708 FileSystem srcFs = srcPath.getFileSystem(conf); 709 FileSystem desFs = desPath.getFileSystem(conf); 710 711 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 712 713 desPath = new Path("hdfs://127.0.0.1:8070/"); 714 desFs = desPath.getFileSystem(conf); 715 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 716 717 desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); 718 desFs = desPath.getFileSystem(conf); 719 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 720 721 conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); 722 conf.set("dfs.nameservices", "haosong-hadoop"); 723 conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); 724 conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", 725 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 726 727 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport); 728 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); 729 desPath = new Path("/"); 730 desFs = desPath.getFileSystem(conf); 731 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 732 733 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport); 734 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); 735 desPath = new Path("/"); 736 desFs = desPath.getFileSystem(conf); 737 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 738 } 739 740 @Test 741 public void testIsSameHdfs() throws IOException { 742 String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); 743 LOG.info("hadoop version is: " + hadoopVersion); 744 boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); 745 if (isHadoop3_0_0) { 746 // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. 747 // See HDFS-9427 748 testIsSameHdfs(9820); 749 } else { 750 // pre hadoop 3.0.0 defaults to port 8020 751 // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 752 testIsSameHdfs(8020); 753 } 754 } 755}