001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.util.List;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.StreamCapabilities;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HDFSBlocksDistribution;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.exceptions.DeserializationException;
046import org.apache.hadoop.hbase.fs.HFileSystem;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.MiscTests;
049import org.apache.hadoop.hdfs.DFSConfigKeys;
050import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
051import org.apache.hadoop.hdfs.DFSTestUtil;
052import org.apache.hadoop.hdfs.DistributedFileSystem;
053import org.apache.hadoop.hdfs.MiniDFSCluster;
054import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Test {@link FSUtils}.
065 */
066@Category({ MiscTests.class, MediumTests.class })
067public class TestFSUtils {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestFSUtils.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
074
075  private HBaseTestingUtility htu;
076  private FileSystem fs;
077  private Configuration conf;
078
079  @Before
080  public void setUp() throws IOException {
081    htu = new HBaseTestingUtility();
082    fs = htu.getTestFileSystem();
083    conf = htu.getConfiguration();
084  }
085
086  @Test
087  public void testIsHDFS() throws Exception {
088    assertFalse(CommonFSUtils.isHDFS(conf));
089    MiniDFSCluster cluster = null;
090    try {
091      cluster = htu.startMiniDFSCluster(1);
092      assertTrue(CommonFSUtils.isHDFS(conf));
093    } finally {
094      if (cluster != null) {
095        cluster.shutdown();
096      }
097    }
098  }
099
100  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
101    FSDataOutputStream out = fs.create(file);
102    byte[] data = new byte[dataSize];
103    out.write(data, 0, dataSize);
104    out.close();
105  }
106
107  @Test
108  public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
109    testComputeHDFSBlocksDistribution((fs, testFile) -> {
110      try (FSDataInputStream open = fs.open(testFile)) {
111        assertTrue(open instanceof HdfsDataInputStream);
112        return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
113      }
114    });
115  }
116
117  @Test
118  public void testComputeHDFSBlockDistribution() throws Exception {
119    testComputeHDFSBlocksDistribution((fs, testFile) -> {
120      FileStatus status = fs.getFileStatus(testFile);
121      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
122    });
123  }
124
125  @FunctionalInterface
126  interface HDFSBlockDistributionFunction {
127    HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
128  }
129
130  private void testComputeHDFSBlocksDistribution(
131    HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
132    final int DEFAULT_BLOCK_SIZE = 1024;
133    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
134    MiniDFSCluster cluster = null;
135    Path testFile = null;
136
137    try {
138      // set up a cluster with 3 nodes
139      String hosts[] = new String[] { "host1", "host2", "host3" };
140      cluster = htu.startMiniDFSCluster(hosts);
141      cluster.waitActive();
142      FileSystem fs = cluster.getFileSystem();
143
144      // create a file with two blocks
145      testFile = new Path("/test1.txt");
146      WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE);
147
148      // given the default replication factor is 3, the same as the number of
149      // datanodes; the locality index for each host should be 100%,
150      // or getWeight for each host should be the same as getUniqueBlocksWeights
151      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
152      boolean ok;
153      do {
154        ok = true;
155
156        HDFSBlocksDistribution blocksDistribution =
157          fileToBlockDistribution.getForPath(fs, testFile);
158
159        long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
160        for (String host : hosts) {
161          long weight = blocksDistribution.getWeight(host);
162          ok = (ok && uniqueBlocksTotalWeight == weight);
163        }
164      } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime);
165      assertTrue(ok);
166    } finally {
167      htu.shutdownMiniDFSCluster();
168    }
169
170    try {
171      // set up a cluster with 4 nodes
172      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
173      cluster = htu.startMiniDFSCluster(hosts);
174      cluster.waitActive();
175      FileSystem fs = cluster.getFileSystem();
176
177      // create a file with three blocks
178      testFile = new Path("/test2.txt");
179      WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE);
180
181      // given the default replication factor is 3, we will have total of 9
182      // replica of blocks; thus the host with the highest weight should have
183      // weight == 3 * DEFAULT_BLOCK_SIZE
184      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
185      long weight;
186      long uniqueBlocksTotalWeight;
187      do {
188        HDFSBlocksDistribution blocksDistribution =
189          fileToBlockDistribution.getForPath(fs, testFile);
190        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
191
192        String tophost = blocksDistribution.getTopHosts().get(0);
193        weight = blocksDistribution.getWeight(tophost);
194
195        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
196      } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime);
197      assertTrue(uniqueBlocksTotalWeight == weight);
198
199    } finally {
200      htu.shutdownMiniDFSCluster();
201    }
202
203    try {
204      // set up a cluster with 4 nodes
205      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
206      cluster = htu.startMiniDFSCluster(hosts);
207      cluster.waitActive();
208      FileSystem fs = cluster.getFileSystem();
209
210      // create a file with one block
211      testFile = new Path("/test3.txt");
212      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
213
214      // given the default replication factor is 3, we will have total of 3
215      // replica of blocks; thus there is one host without weight
216      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
217      HDFSBlocksDistribution blocksDistribution;
218      do {
219        blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
220        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
221      } while (
222        blocksDistribution.getTopHosts().size() != 3
223          && EnvironmentEdgeManager.currentTime() < maxTime
224      );
225      assertEquals("Wrong number of hosts distributing blocks.", 3,
226        blocksDistribution.getTopHosts().size());
227    } finally {
228      htu.shutdownMiniDFSCluster();
229    }
230  }
231
232  private void writeVersionFile(Path versionFile, String version) throws IOException {
233    if (CommonFSUtils.isExists(fs, versionFile)) {
234      assertTrue(CommonFSUtils.delete(fs, versionFile, true));
235    }
236    try (FSDataOutputStream s = fs.create(versionFile)) {
237      s.writeUTF(version);
238    }
239    assertTrue(fs.exists(versionFile));
240  }
241
242  @Test
243  public void testVersion() throws DeserializationException, IOException {
244    final Path rootdir = htu.getDataTestDir();
245    final FileSystem fs = rootdir.getFileSystem(conf);
246    assertNull(FSUtils.getVersion(fs, rootdir));
247    // No meta dir so no complaint from checkVersion.
248    // Presumes it a new install. Will create version file.
249    FSUtils.checkVersion(fs, rootdir, true);
250    // Now remove the version file and create a metadir so checkVersion fails.
251    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
252    assertTrue(CommonFSUtils.isExists(fs, versionFile));
253    assertTrue(CommonFSUtils.delete(fs, versionFile, true));
254    Path metaRegionDir =
255      FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
256    FsPermission defaultPerms =
257      CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY);
258    CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
259    boolean thrown = false;
260    try {
261      FSUtils.checkVersion(fs, rootdir, true);
262    } catch (FileSystemVersionException e) {
263      thrown = true;
264    }
265    assertTrue("Expected FileSystemVersionException", thrown);
266    // Write out a good version file. See if we can read it in and convert.
267    String version = HConstants.FILE_SYSTEM_VERSION;
268    writeVersionFile(versionFile, version);
269    FileStatus[] status = fs.listStatus(versionFile);
270    assertNotNull(status);
271    assertTrue(status.length > 0);
272    String newVersion = FSUtils.getVersion(fs, rootdir);
273    assertEquals(version.length(), newVersion.length());
274    assertEquals(version, newVersion);
275    // File will have been converted. Exercise the pb format
276    assertEquals(version, FSUtils.getVersion(fs, rootdir));
277    FSUtils.checkVersion(fs, rootdir, true);
278    // Write an old version file.
279    String oldVersion = "1";
280    writeVersionFile(versionFile, oldVersion);
281    newVersion = FSUtils.getVersion(fs, rootdir);
282    assertNotEquals(version, newVersion);
283    thrown = false;
284    try {
285      FSUtils.checkVersion(fs, rootdir, true);
286    } catch (FileSystemVersionException e) {
287      thrown = true;
288    }
289    assertTrue("Expected FileSystemVersionException", thrown);
290  }
291
292  @Test
293  public void testPermMask() throws Exception {
294    final Path rootdir = htu.getDataTestDir();
295    final FileSystem fs = rootdir.getFileSystem(conf);
296    // default fs permission
297    FsPermission defaultFsPerm =
298      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
299    // 'hbase.data.umask.enable' is false. We will get default fs permission.
300    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
301
302    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
303    // first check that we don't crash if we don't have perms set
304    FsPermission defaultStartPerm =
305      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
306    // default 'hbase.data.umask'is 000, and this umask will be used when
307    // 'hbase.data.umask.enable' is true.
308    // Therefore we will not get the real fs default in this case.
309    // Instead we will get the starting point FULL_RWX_PERMISSIONS
310    assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
311
312    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
313    // now check that we get the right perms
314    FsPermission filePerm =
315      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
316    assertEquals(new FsPermission("700"), filePerm);
317
318    // then that the correct file is created
319    Path p = new Path("target" + File.separator + HBaseTestingUtility.getRandomUUID().toString());
320    try {
321      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
322      out.close();
323      FileStatus stat = fs.getFileStatus(p);
324      assertEquals(new FsPermission("700"), stat.getPermission());
325      // and then cleanup
326    } finally {
327      fs.delete(p, true);
328    }
329  }
330
331  @Test
332  public void testDeleteAndExists() throws Exception {
333    final Path rootdir = htu.getDataTestDir();
334    final FileSystem fs = rootdir.getFileSystem(conf);
335    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
336    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
337    // then that the correct file is created
338    String file = HBaseTestingUtility.getRandomUUID().toString();
339    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
340    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
341    try {
342      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
343      out.close();
344      assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
345      // delete the file with recursion as false. Only the file will be deleted.
346      CommonFSUtils.delete(fs, p, false);
347      // Create another file
348      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
349      out1.close();
350      // delete the file with recursion as false. Still the file only will be deleted
351      CommonFSUtils.delete(fs, p1, true);
352      assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1));
353      // and then cleanup
354    } finally {
355      CommonFSUtils.delete(fs, p, true);
356      CommonFSUtils.delete(fs, p1, true);
357    }
358  }
359
360  @Test
361  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
362    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
363    try {
364      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(),
365        new Path("definitely/doesn't/exist"), null));
366    } finally {
367      cluster.shutdown();
368    }
369
370  }
371
372  @Test
373  public void testRenameAndSetModifyTime() throws Exception {
374    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
375    assertTrue(CommonFSUtils.isHDFS(conf));
376
377    FileSystem fs = FileSystem.get(conf);
378    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
379
380    String file = HBaseTestingUtility.getRandomUUID().toString();
381    Path p = new Path(testDir, file);
382
383    FSDataOutputStream out = fs.create(p);
384    out.close();
385    assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
386
387    long expect = EnvironmentEdgeManager.currentTime() + 1000;
388    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
389
390    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
391    mockEnv.setValue(expect);
392    EnvironmentEdgeManager.injectEdge(mockEnv);
393    try {
394      String dstFile = HBaseTestingUtility.getRandomUUID().toString();
395      Path dst = new Path(testDir, dstFile);
396
397      assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
398      assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p));
399      assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst));
400
401      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
402      cluster.shutdown();
403    } finally {
404      EnvironmentEdgeManager.reset();
405    }
406  }
407
408  @Test
409  public void testSetStoragePolicyDefault() throws Exception {
410    verifyNoHDFSApiInvocationForDefaultPolicy();
411    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
412  }
413
414  /**
415   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
416   * logic, will need to remove the check if the default policy is changed
417   */
418  private void verifyNoHDFSApiInvocationForDefaultPolicy() {
419    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
420    // There should be no exception thrown when setting to default storage policy, which indicates
421    // the HDFS API hasn't been called
422    try {
423      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
424        HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
425    } catch (IOException e) {
426      Assert.fail("Should have bypassed the FS API when setting default storage policy");
427    }
428    // There should be exception thrown when given non-default storage policy, which indicates the
429    // HDFS API has been called
430    try {
431      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
432      Assert.fail("Should have invoked the FS API but haven't");
433    } catch (IOException e) {
434      // expected given an invalid path
435    }
436  }
437
438  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
439    @Override
440    public void setStoragePolicy(final Path src, final String policyName) throws IOException {
441      throw new IOException("The setStoragePolicy method is invoked");
442    }
443  }
444
445  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
446  @Test
447  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
448    verifyFileInDirWithStoragePolicy("ALL_SSD");
449  }
450
451  final String INVALID_STORAGE_POLICY = "1772";
452
453  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
454  @Test
455  public void testSetStoragePolicyInvalid() throws Exception {
456    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
457  }
458
459  // Here instead of TestCommonFSUtils because we need a minicluster
460  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
461    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
462
463    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
464    try {
465      assertTrue(CommonFSUtils.isHDFS(conf));
466
467      FileSystem fs = FileSystem.get(conf);
468      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
469      fs.mkdirs(testDir);
470
471      String storagePolicy =
472        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
473      CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
474
475      String file = HBaseTestingUtility.getRandomUUID().toString();
476      Path p = new Path(testDir, file);
477      WriteDataToHDFS(fs, p, 4096);
478      HFileSystem hfs = new HFileSystem(fs);
479      String policySet = hfs.getStoragePolicyName(p);
480      LOG.debug("The storage policy of path " + p + " is " + policySet);
481      if (
482        policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
483          || policy.equals(INVALID_STORAGE_POLICY)
484      ) {
485        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
486        LOG.debug("The default hdfs storage policy (indicated by home path: "
487          + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
488        Assert.assertEquals(hdfsDefaultPolicy, policySet);
489      } else {
490        Assert.assertEquals(policy, policySet);
491      }
492      // will assert existence before deleting.
493      cleanupFile(fs, testDir);
494    } finally {
495      cluster.shutdown();
496    }
497  }
498
499  /**
500   * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of
501   * preading with hedged reads enabled using code taken from hdfs TestPread.
502   */
503  @Test
504  public void testDFSHedgedReadMetrics() throws Exception {
505    // Enable hedged reads and set it so the threshold is really low.
506    // Most of this test is taken from HDFS, from TestPread.
507    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
508    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
509    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
510    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
511    // Set short retry timeouts so this test runs faster
512    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
513    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
514    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
515    // Get the metrics. Should be empty.
516    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
517    assertEquals(0, metrics.getHedgedReadOps());
518    FileSystem fileSys = cluster.getFileSystem();
519    try {
520      Path p = new Path("preadtest.dat");
521      // We need > 1 blocks to test out the hedged reads.
522      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3,
523        seed);
524      pReadFile(fileSys, p);
525      cleanupFile(fileSys, p);
526      assertTrue(metrics.getHedgedReadOps() > 0);
527    } finally {
528      fileSys.close();
529      cluster.shutdown();
530    }
531  }
532
533  @Test
534  public void testCopyFilesParallel() throws Exception {
535    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
536    cluster.waitActive();
537    FileSystem fs = cluster.getFileSystem();
538    Path src = new Path("/src");
539    fs.mkdirs(src);
540    for (int i = 0; i < 50; i++) {
541      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
542    }
543    Path sub = new Path(src, "sub");
544    fs.mkdirs(sub);
545    for (int i = 0; i < 50; i++) {
546      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
547    }
548    Path dst = new Path("/dst");
549    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
550
551    assertEquals(102, allFiles.size());
552    FileStatus[] list = fs.listStatus(dst);
553    assertEquals(51, list.length);
554    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
555    assertEquals(50, sublist.length);
556  }
557
558  // Below is taken from TestPread over in HDFS.
559  static final int blockSize = 4096;
560  static final long seed = 0xDEADBEEFL;
561  private Random rand = new Random(); // This test depends on Random#setSeed
562
563  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
564    FSDataInputStream stm = fileSys.open(name);
565    byte[] expected = new byte[12 * blockSize];
566    rand.setSeed(seed);
567    rand.nextBytes(expected);
568    // do a sanity check. Read first 4K bytes
569    byte[] actual = new byte[4096];
570    stm.readFully(actual);
571    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
572    // now do a pread for the first 8K bytes
573    actual = new byte[8192];
574    doPread(stm, 0L, actual, 0, 8192);
575    checkAndEraseData(actual, 0, expected, "Pread Test 1");
576    // Now check to see if the normal read returns 4K-8K byte range
577    actual = new byte[4096];
578    stm.readFully(actual);
579    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
580    // Now see if we can cross a single block boundary successfully
581    // read 4K bytes from blockSize - 2K offset
582    stm.readFully(blockSize - 2048, actual, 0, 4096);
583    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
584    // now see if we can cross two block boundaries successfully
585    // read blockSize + 4K bytes from blockSize - 2K offset
586    actual = new byte[blockSize + 4096];
587    stm.readFully(blockSize - 2048, actual);
588    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
589    // now see if we can cross two block boundaries that are not cached
590    // read blockSize + 4K bytes from 10*blockSize - 2K offset
591    actual = new byte[blockSize + 4096];
592    stm.readFully(10 * blockSize - 2048, actual);
593    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
594    // now check that even after all these preads, we can still read
595    // bytes 8K-12K
596    actual = new byte[4096];
597    stm.readFully(actual);
598    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
599    // done
600    stm.close();
601    // check block location caching
602    stm = fileSys.open(name);
603    stm.readFully(1, actual, 0, 4096);
604    stm.readFully(4 * blockSize, actual, 0, 4096);
605    stm.readFully(7 * blockSize, actual, 0, 4096);
606    actual = new byte[3 * 4096];
607    stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
608    checkAndEraseData(actual, 0, expected, "Pread Test 7");
609    actual = new byte[8 * 4096];
610    stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
611    checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
612    // read the tail
613    stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
614    IOException res = null;
615    try { // read beyond the end of the file
616      stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
617    } catch (IOException e) {
618      // should throw an exception
619      res = e;
620    }
621    assertTrue("Error reading beyond file boundary.", res != null);
622
623    stm.close();
624  }
625
626  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
627    for (int idx = 0; idx < actual.length; idx++) {
628      assertEquals(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx]
629        + " actual " + actual[idx], actual[idx], expected[from + idx]);
630      actual[idx] = 0;
631    }
632  }
633
634  private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length)
635    throws IOException {
636    int nread = 0;
637    // long totalRead = 0;
638    // DFSInputStream dfstm = null;
639
640    /*
641     * Disable. This counts do not add up. Some issue in original hdfs tests? if
642     * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream)
643     * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); }
644     */
645
646    while (nread < length) {
647      int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread);
648      assertTrue("Error in pread", nbytes > 0);
649      nread += nbytes;
650    }
651
652    /*
653     * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) {
654     * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <=
655     * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else {
656     * assertEquals("Expected read statistic to be incremented", length, dfstm
657     * .getReadStatistics().getTotalBytesRead() - totalRead); } }
658     */
659  }
660
661  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
662    assertTrue(fileSys.exists(name));
663    assertTrue(fileSys.delete(name, true));
664    assertTrue(!fileSys.exists(name));
665  }
666
667  static {
668    try {
669      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
670      LOG.debug("Test thought StreamCapabilities class was present.");
671    } catch (ClassNotFoundException exception) {
672      LOG.debug("Test didn't think StreamCapabilities class was present.");
673    }
674  }
675
676  // Here instead of TestCommonFSUtils because we need a minicluster
677  @Test
678  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
679    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
680    try (FileSystem filesystem = cluster.getFileSystem()) {
681      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
682      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
683      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
684      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
685    } finally {
686      cluster.shutdown();
687    }
688  }
689
690  private void testIsSameHdfs(int nnport) throws IOException {
691    Configuration conf = HBaseConfiguration.create();
692    Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
693    Path desPath = new Path("hdfs://127.0.0.1/");
694    FileSystem srcFs = srcPath.getFileSystem(conf);
695    FileSystem desFs = desPath.getFileSystem(conf);
696
697    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
698
699    desPath = new Path("hdfs://127.0.0.1:8070/");
700    desFs = desPath.getFileSystem(conf);
701    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
702
703    desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
704    desFs = desPath.getFileSystem(conf);
705    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
706
707    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
708    conf.set("dfs.nameservices", "haosong-hadoop");
709    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
710    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
711      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
712
713    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
714    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
715    desPath = new Path("/");
716    desFs = desPath.getFileSystem(conf);
717    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
718
719    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
720    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
721    desPath = new Path("/");
722    desFs = desPath.getFileSystem(conf);
723    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
724  }
725
726  @Test
727  public void testIsSameHdfs() throws IOException {
728    String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
729    LOG.info("hadoop version is: " + hadoopVersion);
730    boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
731    if (isHadoop3_0_0) {
732      // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
733      // See HDFS-9427
734      testIsSameHdfs(9820);
735    } else {
736      // pre hadoop 3.0.0 defaults to port 8020
737      // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
738      testIsSameHdfs(8020);
739    }
740  }
741}