001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.util.List; 030import java.util.Random; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.StreamCapabilities; 038import org.apache.hadoop.fs.permission.FsPermission; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HDFSBlocksDistribution; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.exceptions.DeserializationException; 046import org.apache.hadoop.hbase.fs.HFileSystem; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.MiscTests; 049import org.apache.hadoop.hdfs.DFSConfigKeys; 050import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 051import org.apache.hadoop.hdfs.DFSTestUtil; 052import org.apache.hadoop.hdfs.DistributedFileSystem; 053import org.apache.hadoop.hdfs.MiniDFSCluster; 054import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 055import org.junit.Assert; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Test {@link FSUtils}. 065 */ 066@Category({ MiscTests.class, MediumTests.class }) 067public class TestFSUtils { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestFSUtils.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class); 074 075 private HBaseTestingUtility htu; 076 private FileSystem fs; 077 private Configuration conf; 078 079 @Before 080 public void setUp() throws IOException { 081 htu = new HBaseTestingUtility(); 082 fs = htu.getTestFileSystem(); 083 conf = htu.getConfiguration(); 084 } 085 086 @Test 087 public void testIsHDFS() throws Exception { 088 assertFalse(CommonFSUtils.isHDFS(conf)); 089 MiniDFSCluster cluster = null; 090 try { 091 cluster = htu.startMiniDFSCluster(1); 092 assertTrue(CommonFSUtils.isHDFS(conf)); 093 } finally { 094 if (cluster != null) { 095 cluster.shutdown(); 096 } 097 } 098 } 099 100 private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception { 101 FSDataOutputStream out = fs.create(file); 102 byte[] data = new byte[dataSize]; 103 out.write(data, 0, dataSize); 104 out.close(); 105 } 106 107 @Test 108 public void testComputeHDFSBlocksDistributionByInputStream() throws Exception { 109 testComputeHDFSBlocksDistribution((fs, testFile) -> { 110 try (FSDataInputStream open = fs.open(testFile)) { 111 assertTrue(open instanceof HdfsDataInputStream); 112 return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open); 113 } 114 }); 115 } 116 117 @Test 118 public void testComputeHDFSBlockDistribution() throws Exception { 119 testComputeHDFSBlocksDistribution((fs, testFile) -> { 120 FileStatus status = fs.getFileStatus(testFile); 121 return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); 122 }); 123 } 124 125 @FunctionalInterface 126 interface HDFSBlockDistributionFunction { 127 HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException; 128 } 129 130 private void testComputeHDFSBlocksDistribution( 131 HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception { 132 final int DEFAULT_BLOCK_SIZE = 1024; 133 conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 134 MiniDFSCluster cluster = null; 135 Path testFile = null; 136 137 try { 138 // set up a cluster with 3 nodes 139 String hosts[] = new String[] { "host1", "host2", "host3" }; 140 cluster = htu.startMiniDFSCluster(hosts); 141 cluster.waitActive(); 142 FileSystem fs = cluster.getFileSystem(); 143 144 // create a file with two blocks 145 testFile = new Path("/test1.txt"); 146 WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE); 147 148 // given the default replication factor is 3, the same as the number of 149 // datanodes; the locality index for each host should be 100%, 150 // or getWeight for each host should be the same as getUniqueBlocksWeights 151 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 152 boolean ok; 153 do { 154 ok = true; 155 156 HDFSBlocksDistribution blocksDistribution = 157 fileToBlockDistribution.getForPath(fs, testFile); 158 159 long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 160 for (String host : hosts) { 161 long weight = blocksDistribution.getWeight(host); 162 ok = (ok && uniqueBlocksTotalWeight == weight); 163 } 164 } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime); 165 assertTrue(ok); 166 } finally { 167 htu.shutdownMiniDFSCluster(); 168 } 169 170 try { 171 // set up a cluster with 4 nodes 172 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 173 cluster = htu.startMiniDFSCluster(hosts); 174 cluster.waitActive(); 175 FileSystem fs = cluster.getFileSystem(); 176 177 // create a file with three blocks 178 testFile = new Path("/test2.txt"); 179 WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE); 180 181 // given the default replication factor is 3, we will have total of 9 182 // replica of blocks; thus the host with the highest weight should have 183 // weight == 3 * DEFAULT_BLOCK_SIZE 184 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 185 long weight; 186 long uniqueBlocksTotalWeight; 187 do { 188 HDFSBlocksDistribution blocksDistribution = 189 fileToBlockDistribution.getForPath(fs, testFile); 190 uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight(); 191 192 String tophost = blocksDistribution.getTopHosts().get(0); 193 weight = blocksDistribution.getWeight(tophost); 194 195 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 196 } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime); 197 assertTrue(uniqueBlocksTotalWeight == weight); 198 199 } finally { 200 htu.shutdownMiniDFSCluster(); 201 } 202 203 try { 204 // set up a cluster with 4 nodes 205 String hosts[] = new String[] { "host1", "host2", "host3", "host4" }; 206 cluster = htu.startMiniDFSCluster(hosts); 207 cluster.waitActive(); 208 FileSystem fs = cluster.getFileSystem(); 209 210 // create a file with one block 211 testFile = new Path("/test3.txt"); 212 WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE); 213 214 // given the default replication factor is 3, we will have total of 3 215 // replica of blocks; thus there is one host without weight 216 final long maxTime = EnvironmentEdgeManager.currentTime() + 2000; 217 HDFSBlocksDistribution blocksDistribution; 218 do { 219 blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile); 220 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175 221 } while ( 222 blocksDistribution.getTopHosts().size() != 3 223 && EnvironmentEdgeManager.currentTime() < maxTime 224 ); 225 assertEquals("Wrong number of hosts distributing blocks.", 3, 226 blocksDistribution.getTopHosts().size()); 227 } finally { 228 htu.shutdownMiniDFSCluster(); 229 } 230 } 231 232 private void writeVersionFile(Path versionFile, String version) throws IOException { 233 if (CommonFSUtils.isExists(fs, versionFile)) { 234 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 235 } 236 try (FSDataOutputStream s = fs.create(versionFile)) { 237 s.writeUTF(version); 238 } 239 assertTrue(fs.exists(versionFile)); 240 } 241 242 @Test 243 public void testVersion() throws DeserializationException, IOException { 244 final Path rootdir = htu.getDataTestDir(); 245 final FileSystem fs = rootdir.getFileSystem(conf); 246 assertNull(FSUtils.getVersion(fs, rootdir)); 247 // No meta dir so no complaint from checkVersion. 248 // Presumes it a new install. Will create version file. 249 FSUtils.checkVersion(fs, rootdir, true); 250 // Now remove the version file and create a metadir so checkVersion fails. 251 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 252 assertTrue(CommonFSUtils.isExists(fs, versionFile)); 253 assertTrue(CommonFSUtils.delete(fs, versionFile, true)); 254 Path metaRegionDir = 255 FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO); 256 FsPermission defaultPerms = 257 CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY); 258 CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false); 259 boolean thrown = false; 260 try { 261 FSUtils.checkVersion(fs, rootdir, true); 262 } catch (FileSystemVersionException e) { 263 thrown = true; 264 } 265 assertTrue("Expected FileSystemVersionException", thrown); 266 // Write out a good version file. See if we can read it in and convert. 267 String version = HConstants.FILE_SYSTEM_VERSION; 268 writeVersionFile(versionFile, version); 269 FileStatus[] status = fs.listStatus(versionFile); 270 assertNotNull(status); 271 assertTrue(status.length > 0); 272 String newVersion = FSUtils.getVersion(fs, rootdir); 273 assertEquals(version.length(), newVersion.length()); 274 assertEquals(version, newVersion); 275 // File will have been converted. Exercise the pb format 276 assertEquals(version, FSUtils.getVersion(fs, rootdir)); 277 FSUtils.checkVersion(fs, rootdir, true); 278 // Write an old version file. 279 String oldVersion = "1"; 280 writeVersionFile(versionFile, oldVersion); 281 newVersion = FSUtils.getVersion(fs, rootdir); 282 assertNotEquals(version, newVersion); 283 thrown = false; 284 try { 285 FSUtils.checkVersion(fs, rootdir, true); 286 } catch (FileSystemVersionException e) { 287 thrown = true; 288 } 289 assertTrue("Expected FileSystemVersionException", thrown); 290 } 291 292 @Test 293 public void testPermMask() throws Exception { 294 final Path rootdir = htu.getDataTestDir(); 295 final FileSystem fs = rootdir.getFileSystem(conf); 296 // default fs permission 297 FsPermission defaultFsPerm = 298 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 299 // 'hbase.data.umask.enable' is false. We will get default fs permission. 300 assertEquals(FsPermission.getFileDefault(), defaultFsPerm); 301 302 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 303 // first check that we don't crash if we don't have perms set 304 FsPermission defaultStartPerm = 305 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 306 // default 'hbase.data.umask'is 000, and this umask will be used when 307 // 'hbase.data.umask.enable' is true. 308 // Therefore we will not get the real fs default in this case. 309 // Instead we will get the starting point FULL_RWX_PERMISSIONS 310 assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm); 311 312 conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077"); 313 // now check that we get the right perms 314 FsPermission filePerm = 315 CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 316 assertEquals(new FsPermission("700"), filePerm); 317 318 // then that the correct file is created 319 Path p = new Path("target" + File.separator + HBaseTestingUtility.getRandomUUID().toString()); 320 try { 321 FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); 322 out.close(); 323 FileStatus stat = fs.getFileStatus(p); 324 assertEquals(new FsPermission("700"), stat.getPermission()); 325 // and then cleanup 326 } finally { 327 fs.delete(p, true); 328 } 329 } 330 331 @Test 332 public void testDeleteAndExists() throws Exception { 333 final Path rootdir = htu.getDataTestDir(); 334 final FileSystem fs = rootdir.getFileSystem(conf); 335 conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); 336 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 337 // then that the correct file is created 338 String file = HBaseTestingUtility.getRandomUUID().toString(); 339 Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); 340 Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); 341 try { 342 FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); 343 out.close(); 344 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 345 // delete the file with recursion as false. Only the file will be deleted. 346 CommonFSUtils.delete(fs, p, false); 347 // Create another file 348 FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); 349 out1.close(); 350 // delete the file with recursion as false. Still the file only will be deleted 351 CommonFSUtils.delete(fs, p1, true); 352 assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1)); 353 // and then cleanup 354 } finally { 355 CommonFSUtils.delete(fs, p, true); 356 CommonFSUtils.delete(fs, p1, true); 357 } 358 } 359 360 @Test 361 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { 362 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 363 try { 364 assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(), 365 new Path("definitely/doesn't/exist"), null)); 366 } finally { 367 cluster.shutdown(); 368 } 369 370 } 371 372 @Test 373 public void testRenameAndSetModifyTime() throws Exception { 374 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 375 assertTrue(CommonFSUtils.isHDFS(conf)); 376 377 FileSystem fs = FileSystem.get(conf); 378 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 379 380 String file = HBaseTestingUtility.getRandomUUID().toString(); 381 Path p = new Path(testDir, file); 382 383 FSDataOutputStream out = fs.create(p); 384 out.close(); 385 assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p)); 386 387 long expect = EnvironmentEdgeManager.currentTime() + 1000; 388 assertNotEquals(expect, fs.getFileStatus(p).getModificationTime()); 389 390 ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge(); 391 mockEnv.setValue(expect); 392 EnvironmentEdgeManager.injectEdge(mockEnv); 393 try { 394 String dstFile = HBaseTestingUtility.getRandomUUID().toString(); 395 Path dst = new Path(testDir, dstFile); 396 397 assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst)); 398 assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p)); 399 assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst)); 400 401 assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); 402 cluster.shutdown(); 403 } finally { 404 EnvironmentEdgeManager.reset(); 405 } 406 } 407 408 @Test 409 public void testSetStoragePolicyDefault() throws Exception { 410 verifyNoHDFSApiInvocationForDefaultPolicy(); 411 verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); 412 } 413 414 /** 415 * Note: currently the default policy is set to defer to HDFS and this case is to verify the 416 * logic, will need to remove the check if the default policy is changed 417 */ 418 private void verifyNoHDFSApiInvocationForDefaultPolicy() { 419 FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); 420 // There should be no exception thrown when setting to default storage policy, which indicates 421 // the HDFS API hasn't been called 422 try { 423 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), 424 HConstants.DEFAULT_WAL_STORAGE_POLICY, true); 425 } catch (IOException e) { 426 Assert.fail("Should have bypassed the FS API when setting default storage policy"); 427 } 428 // There should be exception thrown when given non-default storage policy, which indicates the 429 // HDFS API has been called 430 try { 431 CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); 432 Assert.fail("Should have invoked the FS API but haven't"); 433 } catch (IOException e) { 434 // expected given an invalid path 435 } 436 } 437 438 class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { 439 @Override 440 public void setStoragePolicy(final Path src, final String policyName) throws IOException { 441 throw new IOException("The setStoragePolicy method is invoked"); 442 } 443 } 444 445 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ 446 @Test 447 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { 448 verifyFileInDirWithStoragePolicy("ALL_SSD"); 449 } 450 451 final String INVALID_STORAGE_POLICY = "1772"; 452 453 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ 454 @Test 455 public void testSetStoragePolicyInvalid() throws Exception { 456 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); 457 } 458 459 // Here instead of TestCommonFSUtils because we need a minicluster 460 private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { 461 conf.set(HConstants.WAL_STORAGE_POLICY, policy); 462 463 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 464 try { 465 assertTrue(CommonFSUtils.isHDFS(conf)); 466 467 FileSystem fs = FileSystem.get(conf); 468 Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile"); 469 fs.mkdirs(testDir); 470 471 String storagePolicy = 472 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 473 CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy); 474 475 String file = HBaseTestingUtility.getRandomUUID().toString(); 476 Path p = new Path(testDir, file); 477 WriteDataToHDFS(fs, p, 4096); 478 HFileSystem hfs = new HFileSystem(fs); 479 String policySet = hfs.getStoragePolicyName(p); 480 LOG.debug("The storage policy of path " + p + " is " + policySet); 481 if ( 482 policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY) 483 || policy.equals(INVALID_STORAGE_POLICY) 484 ) { 485 String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory()); 486 LOG.debug("The default hdfs storage policy (indicated by home path: " 487 + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy); 488 Assert.assertEquals(hdfsDefaultPolicy, policySet); 489 } else { 490 Assert.assertEquals(policy, policySet); 491 } 492 // will assert existence before deleting. 493 cleanupFile(fs, testDir); 494 } finally { 495 cluster.shutdown(); 496 } 497 } 498 499 /** 500 * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of 501 * preading with hedged reads enabled using code taken from hdfs TestPread. 502 */ 503 @Test 504 public void testDFSHedgedReadMetrics() throws Exception { 505 // Enable hedged reads and set it so the threshold is really low. 506 // Most of this test is taken from HDFS, from TestPread. 507 conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); 508 conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); 509 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); 510 conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); 511 // Set short retry timeouts so this test runs faster 512 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); 513 conf.setBoolean("dfs.datanode.transferTo.allowed", false); 514 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); 515 // Get the metrics. Should be empty. 516 DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); 517 assertEquals(0, metrics.getHedgedReadOps()); 518 FileSystem fileSys = cluster.getFileSystem(); 519 try { 520 Path p = new Path("preadtest.dat"); 521 // We need > 1 blocks to test out the hedged reads. 522 DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3, 523 seed); 524 pReadFile(fileSys, p); 525 cleanupFile(fileSys, p); 526 assertTrue(metrics.getHedgedReadOps() > 0); 527 } finally { 528 fileSys.close(); 529 cluster.shutdown(); 530 } 531 } 532 533 @Test 534 public void testCopyFilesParallel() throws Exception { 535 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 536 cluster.waitActive(); 537 FileSystem fs = cluster.getFileSystem(); 538 Path src = new Path("/src"); 539 fs.mkdirs(src); 540 for (int i = 0; i < 50; i++) { 541 WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024); 542 } 543 Path sub = new Path(src, "sub"); 544 fs.mkdirs(sub); 545 for (int i = 0; i < 50; i++) { 546 WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024); 547 } 548 Path dst = new Path("/dst"); 549 List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4); 550 551 assertEquals(102, allFiles.size()); 552 FileStatus[] list = fs.listStatus(dst); 553 assertEquals(51, list.length); 554 FileStatus[] sublist = fs.listStatus(new Path(dst, "sub")); 555 assertEquals(50, sublist.length); 556 } 557 558 // Below is taken from TestPread over in HDFS. 559 static final int blockSize = 4096; 560 static final long seed = 0xDEADBEEFL; 561 private Random rand = new Random(); // This test depends on Random#setSeed 562 563 private void pReadFile(FileSystem fileSys, Path name) throws IOException { 564 FSDataInputStream stm = fileSys.open(name); 565 byte[] expected = new byte[12 * blockSize]; 566 rand.setSeed(seed); 567 rand.nextBytes(expected); 568 // do a sanity check. Read first 4K bytes 569 byte[] actual = new byte[4096]; 570 stm.readFully(actual); 571 checkAndEraseData(actual, 0, expected, "Read Sanity Test"); 572 // now do a pread for the first 8K bytes 573 actual = new byte[8192]; 574 doPread(stm, 0L, actual, 0, 8192); 575 checkAndEraseData(actual, 0, expected, "Pread Test 1"); 576 // Now check to see if the normal read returns 4K-8K byte range 577 actual = new byte[4096]; 578 stm.readFully(actual); 579 checkAndEraseData(actual, 4096, expected, "Pread Test 2"); 580 // Now see if we can cross a single block boundary successfully 581 // read 4K bytes from blockSize - 2K offset 582 stm.readFully(blockSize - 2048, actual, 0, 4096); 583 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); 584 // now see if we can cross two block boundaries successfully 585 // read blockSize + 4K bytes from blockSize - 2K offset 586 actual = new byte[blockSize + 4096]; 587 stm.readFully(blockSize - 2048, actual); 588 checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); 589 // now see if we can cross two block boundaries that are not cached 590 // read blockSize + 4K bytes from 10*blockSize - 2K offset 591 actual = new byte[blockSize + 4096]; 592 stm.readFully(10 * blockSize - 2048, actual); 593 checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); 594 // now check that even after all these preads, we can still read 595 // bytes 8K-12K 596 actual = new byte[4096]; 597 stm.readFully(actual); 598 checkAndEraseData(actual, 8192, expected, "Pread Test 6"); 599 // done 600 stm.close(); 601 // check block location caching 602 stm = fileSys.open(name); 603 stm.readFully(1, actual, 0, 4096); 604 stm.readFully(4 * blockSize, actual, 0, 4096); 605 stm.readFully(7 * blockSize, actual, 0, 4096); 606 actual = new byte[3 * 4096]; 607 stm.readFully(0 * blockSize, actual, 0, 3 * 4096); 608 checkAndEraseData(actual, 0, expected, "Pread Test 7"); 609 actual = new byte[8 * 4096]; 610 stm.readFully(3 * blockSize, actual, 0, 8 * 4096); 611 checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8"); 612 // read the tail 613 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2); 614 IOException res = null; 615 try { // read beyond the end of the file 616 stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize); 617 } catch (IOException e) { 618 // should throw an exception 619 res = e; 620 } 621 assertTrue("Error reading beyond file boundary.", res != null); 622 623 stm.close(); 624 } 625 626 private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { 627 for (int idx = 0; idx < actual.length; idx++) { 628 assertEquals(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx] 629 + " actual " + actual[idx], actual[idx], expected[from + idx]); 630 actual[idx] = 0; 631 } 632 } 633 634 private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length) 635 throws IOException { 636 int nread = 0; 637 // long totalRead = 0; 638 // DFSInputStream dfstm = null; 639 640 /* 641 * Disable. This counts do not add up. Some issue in original hdfs tests? if 642 * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream) 643 * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); } 644 */ 645 646 while (nread < length) { 647 int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread); 648 assertTrue("Error in pread", nbytes > 0); 649 nread += nbytes; 650 } 651 652 /* 653 * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) { 654 * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <= 655 * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else { 656 * assertEquals("Expected read statistic to be incremented", length, dfstm 657 * .getReadStatistics().getTotalBytesRead() - totalRead); } } 658 */ 659 } 660 661 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 662 assertTrue(fileSys.exists(name)); 663 assertTrue(fileSys.delete(name, true)); 664 assertTrue(!fileSys.exists(name)); 665 } 666 667 static { 668 try { 669 Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 670 LOG.debug("Test thought StreamCapabilities class was present."); 671 } catch (ClassNotFoundException exception) { 672 LOG.debug("Test didn't think StreamCapabilities class was present."); 673 } 674 } 675 676 // Here instead of TestCommonFSUtils because we need a minicluster 677 @Test 678 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { 679 MiniDFSCluster cluster = htu.startMiniDFSCluster(1); 680 try (FileSystem filesystem = cluster.getFileSystem()) { 681 FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); 682 assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); 683 assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); 684 assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add.")); 685 } finally { 686 cluster.shutdown(); 687 } 688 } 689 690 private void testIsSameHdfs(int nnport) throws IOException { 691 Configuration conf = HBaseConfiguration.create(); 692 Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); 693 Path desPath = new Path("hdfs://127.0.0.1/"); 694 FileSystem srcFs = srcPath.getFileSystem(conf); 695 FileSystem desFs = desPath.getFileSystem(conf); 696 697 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 698 699 desPath = new Path("hdfs://127.0.0.1:8070/"); 700 desFs = desPath.getFileSystem(conf); 701 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 702 703 desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); 704 desFs = desPath.getFileSystem(conf); 705 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 706 707 conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); 708 conf.set("dfs.nameservices", "haosong-hadoop"); 709 conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); 710 conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", 711 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 712 713 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport); 714 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); 715 desPath = new Path("/"); 716 desFs = desPath.getFileSystem(conf); 717 assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); 718 719 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport); 720 conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); 721 desPath = new Path("/"); 722 desFs = desPath.getFileSystem(conf); 723 assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); 724 } 725 726 @Test 727 public void testIsSameHdfs() throws IOException { 728 String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); 729 LOG.info("hadoop version is: " + hadoopVersion); 730 boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); 731 if (isHadoop3_0_0) { 732 // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. 733 // See HDFS-9427 734 testIsSameHdfs(9820); 735 } else { 736 // pre hadoop 3.0.0 defaults to port 8020 737 // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 738 testIsSameHdfs(8020); 739 } 740 } 741}