001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.net.URI;
030import java.net.URISyntaxException;
031import java.util.List;
032import java.util.Random;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.LocalFileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.StreamCapabilities;
041import org.apache.hadoop.fs.permission.FsPermission;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HDFSBlocksDistribution;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.exceptions.DeserializationException;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.MiscTests;
052import org.apache.hadoop.hdfs.DFSConfigKeys;
053import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
054import org.apache.hadoop.hdfs.DFSTestUtil;
055import org.apache.hadoop.hdfs.DistributedFileSystem;
056import org.apache.hadoop.hdfs.MiniDFSCluster;
057import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
058import org.junit.Assert;
059import org.junit.Before;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * Test {@link FSUtils}.
068 */
069@Category({ MiscTests.class, MediumTests.class })
070public class TestFSUtils {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestFSUtils.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestFSUtils.class);
077
078  private HBaseTestingUtil htu;
079  private FileSystem fs;
080  private Configuration conf;
081
082  @Before
083  public void setUp() throws IOException {
084    htu = new HBaseTestingUtil();
085    fs = htu.getTestFileSystem();
086    conf = htu.getConfiguration();
087  }
088
089  @Test
090  public void testIsHDFS() throws Exception {
091    assertFalse(CommonFSUtils.isHDFS(conf));
092    MiniDFSCluster cluster = null;
093    try {
094      cluster = htu.startMiniDFSCluster(1);
095      assertTrue(CommonFSUtils.isHDFS(conf));
096      assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem()));
097      FSUtils.checkDfsSafeMode(conf);
098    } finally {
099      if (cluster != null) {
100        cluster.shutdown();
101      }
102    }
103  }
104
105  @Test
106  public void testLocalFileSystemSafeMode() throws Exception {
107    conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
108    assertFalse(CommonFSUtils.isHDFS(conf));
109    assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf)));
110    FSUtils.checkDfsSafeMode(conf);
111  }
112
113  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception {
114    FSDataOutputStream out = fs.create(file);
115    byte[] data = new byte[dataSize];
116    out.write(data, 0, dataSize);
117    out.close();
118  }
119
120  @Test
121  public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
122    testComputeHDFSBlocksDistribution((fs, testFile) -> {
123      try (FSDataInputStream open = fs.open(testFile)) {
124        assertTrue(open instanceof HdfsDataInputStream);
125        return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
126      }
127    });
128  }
129
130  @Test
131  public void testComputeHDFSBlockDistribution() throws Exception {
132    testComputeHDFSBlocksDistribution((fs, testFile) -> {
133      FileStatus status = fs.getFileStatus(testFile);
134      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
135    });
136  }
137
138  @FunctionalInterface
139  interface HDFSBlockDistributionFunction {
140    HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
141  }
142
143  private void testComputeHDFSBlocksDistribution(
144    HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
145    final int DEFAULT_BLOCK_SIZE = 1024;
146    conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
147    MiniDFSCluster cluster = null;
148    Path testFile = null;
149
150    try {
151      // set up a cluster with 3 nodes
152      String hosts[] = new String[] { "host1", "host2", "host3" };
153      cluster = htu.startMiniDFSCluster(hosts);
154      cluster.waitActive();
155      FileSystem fs = cluster.getFileSystem();
156
157      // create a file with two blocks
158      testFile = new Path("/test1.txt");
159      WriteDataToHDFS(fs, testFile, 2 * DEFAULT_BLOCK_SIZE);
160
161      // given the default replication factor is 3, the same as the number of
162      // datanodes; the locality index for each host should be 100%,
163      // or getWeight for each host should be the same as getUniqueBlocksWeights
164      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
165      boolean ok;
166      do {
167        ok = true;
168
169        HDFSBlocksDistribution blocksDistribution =
170          fileToBlockDistribution.getForPath(fs, testFile);
171
172        long uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
173        for (String host : hosts) {
174          long weight = blocksDistribution.getWeight(host);
175          ok = (ok && uniqueBlocksTotalWeight == weight);
176        }
177      } while (!ok && EnvironmentEdgeManager.currentTime() < maxTime);
178      assertTrue(ok);
179    } finally {
180      htu.shutdownMiniDFSCluster();
181    }
182
183    try {
184      // set up a cluster with 4 nodes
185      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
186      cluster = htu.startMiniDFSCluster(hosts);
187      cluster.waitActive();
188      FileSystem fs = cluster.getFileSystem();
189
190      // create a file with three blocks
191      testFile = new Path("/test2.txt");
192      WriteDataToHDFS(fs, testFile, 3 * DEFAULT_BLOCK_SIZE);
193
194      // given the default replication factor is 3, we will have total of 9
195      // replica of blocks; thus the host with the highest weight should have
196      // weight == 3 * DEFAULT_BLOCK_SIZE
197      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
198      long weight;
199      long uniqueBlocksTotalWeight;
200      do {
201        HDFSBlocksDistribution blocksDistribution =
202          fileToBlockDistribution.getForPath(fs, testFile);
203        uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
204
205        String tophost = blocksDistribution.getTopHosts().get(0);
206        weight = blocksDistribution.getWeight(tophost);
207
208        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
209      } while (uniqueBlocksTotalWeight != weight && EnvironmentEdgeManager.currentTime() < maxTime);
210      assertTrue(uniqueBlocksTotalWeight == weight);
211
212    } finally {
213      htu.shutdownMiniDFSCluster();
214    }
215
216    try {
217      // set up a cluster with 4 nodes
218      String hosts[] = new String[] { "host1", "host2", "host3", "host4" };
219      cluster = htu.startMiniDFSCluster(hosts);
220      cluster.waitActive();
221      FileSystem fs = cluster.getFileSystem();
222
223      // create a file with one block
224      testFile = new Path("/test3.txt");
225      WriteDataToHDFS(fs, testFile, DEFAULT_BLOCK_SIZE);
226
227      // given the default replication factor is 3, we will have total of 3
228      // replica of blocks; thus there is one host without weight
229      final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
230      HDFSBlocksDistribution blocksDistribution;
231      do {
232        blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
233        // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
234      } while (
235        blocksDistribution.getTopHosts().size() != 3
236          && EnvironmentEdgeManager.currentTime() < maxTime
237      );
238      assertEquals("Wrong number of hosts distributing blocks.", 3,
239        blocksDistribution.getTopHosts().size());
240    } finally {
241      htu.shutdownMiniDFSCluster();
242    }
243  }
244
245  private void writeVersionFile(Path versionFile, String version) throws IOException {
246    if (CommonFSUtils.isExists(fs, versionFile)) {
247      assertTrue(CommonFSUtils.delete(fs, versionFile, true));
248    }
249    try (FSDataOutputStream s = fs.create(versionFile)) {
250      s.writeUTF(version);
251    }
252    assertTrue(fs.exists(versionFile));
253  }
254
255  @Test
256  public void testVersion() throws DeserializationException, IOException {
257    final Path rootdir = htu.getDataTestDir();
258    final FileSystem fs = rootdir.getFileSystem(conf);
259    assertNull(FSUtils.getVersion(fs, rootdir));
260    // No meta dir so no complaint from checkVersion.
261    // Presumes it a new install. Will create version file.
262    FSUtils.checkVersion(fs, rootdir, true);
263    // Now remove the version file and create a metadir so checkVersion fails.
264    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
265    assertTrue(CommonFSUtils.isExists(fs, versionFile));
266    assertTrue(CommonFSUtils.delete(fs, versionFile, true));
267    Path metaRegionDir =
268      FSUtils.getRegionDirFromRootDir(rootdir, RegionInfoBuilder.FIRST_META_REGIONINFO);
269    FsPermission defaultPerms =
270      CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY);
271    CommonFSUtils.create(fs, metaRegionDir, defaultPerms, false);
272    boolean thrown = false;
273    try {
274      FSUtils.checkVersion(fs, rootdir, true);
275    } catch (FileSystemVersionException e) {
276      thrown = true;
277    }
278    assertTrue("Expected FileSystemVersionException", thrown);
279    // Write out a good version file. See if we can read it in and convert.
280    String version = HConstants.FILE_SYSTEM_VERSION;
281    writeVersionFile(versionFile, version);
282    FileStatus[] status = fs.listStatus(versionFile);
283    assertNotNull(status);
284    assertTrue(status.length > 0);
285    String newVersion = FSUtils.getVersion(fs, rootdir);
286    assertEquals(version.length(), newVersion.length());
287    assertEquals(version, newVersion);
288    // File will have been converted. Exercise the pb format
289    assertEquals(version, FSUtils.getVersion(fs, rootdir));
290    FSUtils.checkVersion(fs, rootdir, true);
291    // Write an old version file.
292    String oldVersion = "1";
293    writeVersionFile(versionFile, oldVersion);
294    newVersion = FSUtils.getVersion(fs, rootdir);
295    assertNotEquals(version, newVersion);
296    thrown = false;
297    try {
298      FSUtils.checkVersion(fs, rootdir, true);
299    } catch (FileSystemVersionException e) {
300      thrown = true;
301    }
302    assertTrue("Expected FileSystemVersionException", thrown);
303  }
304
305  @Test
306  public void testPermMask() throws Exception {
307    final Path rootdir = htu.getDataTestDir();
308    final FileSystem fs = rootdir.getFileSystem(conf);
309    // default fs permission
310    FsPermission defaultFsPerm =
311      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
312    // 'hbase.data.umask.enable' is false. We will get default fs permission.
313    assertEquals(FsPermission.getFileDefault(), defaultFsPerm);
314
315    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
316    // first check that we don't crash if we don't have perms set
317    FsPermission defaultStartPerm =
318      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
319    // default 'hbase.data.umask'is 000, and this umask will be used when
320    // 'hbase.data.umask.enable' is true.
321    // Therefore we will not get the real fs default in this case.
322    // Instead we will get the starting point FULL_RWX_PERMISSIONS
323    assertEquals(new FsPermission(CommonFSUtils.FULL_RWX_PERMISSIONS), defaultStartPerm);
324
325    conf.setStrings(HConstants.DATA_FILE_UMASK_KEY, "077");
326    // now check that we get the right perms
327    FsPermission filePerm =
328      CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
329    assertEquals(new FsPermission("700"), filePerm);
330
331    // then that the correct file is created
332    Path p = new Path("target" + File.separator + HBaseTestingUtil.getRandomUUID().toString());
333    try {
334      FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
335      out.close();
336      FileStatus stat = fs.getFileStatus(p);
337      assertEquals(new FsPermission("700"), stat.getPermission());
338      // and then cleanup
339    } finally {
340      fs.delete(p, true);
341    }
342  }
343
344  @Test
345  public void testDeleteAndExists() throws Exception {
346    final Path rootdir = htu.getDataTestDir();
347    final FileSystem fs = rootdir.getFileSystem(conf);
348    conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
349    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
350    // then that the correct file is created
351    String file = HBaseTestingUtil.getRandomUUID().toString();
352    Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
353    Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
354    try {
355      FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
356      out.close();
357      assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
358      // delete the file with recursion as false. Only the file will be deleted.
359      CommonFSUtils.delete(fs, p, false);
360      // Create another file
361      FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
362      out1.close();
363      // delete the file with recursion as false. Still the file only will be deleted
364      CommonFSUtils.delete(fs, p1, true);
365      assertFalse("The created file should be present", CommonFSUtils.isExists(fs, p1));
366      // and then cleanup
367    } finally {
368      CommonFSUtils.delete(fs, p, true);
369      CommonFSUtils.delete(fs, p1, true);
370    }
371  }
372
373  @Test
374  public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception {
375    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
376    try {
377      assertNull(FSUtils.listStatusWithStatusFilter(cluster.getFileSystem(),
378        new Path("definitely/doesn't/exist"), null));
379    } finally {
380      cluster.shutdown();
381    }
382
383  }
384
385  @Test
386  public void testRenameAndSetModifyTime() throws Exception {
387    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
388    assertTrue(CommonFSUtils.isHDFS(conf));
389
390    FileSystem fs = FileSystem.get(conf);
391    Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
392
393    String file = HBaseTestingUtil.getRandomUUID().toString();
394    Path p = new Path(testDir, file);
395
396    FSDataOutputStream out = fs.create(p);
397    out.close();
398    assertTrue("The created file should be present", CommonFSUtils.isExists(fs, p));
399
400    long expect = EnvironmentEdgeManager.currentTime() + 1000;
401    assertNotEquals(expect, fs.getFileStatus(p).getModificationTime());
402
403    ManualEnvironmentEdge mockEnv = new ManualEnvironmentEdge();
404    mockEnv.setValue(expect);
405    EnvironmentEdgeManager.injectEdge(mockEnv);
406    try {
407      String dstFile = HBaseTestingUtil.getRandomUUID().toString();
408      Path dst = new Path(testDir, dstFile);
409
410      assertTrue(CommonFSUtils.renameAndSetModifyTime(fs, p, dst));
411      assertFalse("The moved file should not be present", CommonFSUtils.isExists(fs, p));
412      assertTrue("The dst file should be present", CommonFSUtils.isExists(fs, dst));
413
414      assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
415      cluster.shutdown();
416    } finally {
417      EnvironmentEdgeManager.reset();
418    }
419  }
420
421  @Test
422  public void testSetStoragePolicyDefault() throws Exception {
423    verifyNoHDFSApiInvocationForDefaultPolicy();
424    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
425  }
426
427  /**
428   * Note: currently the default policy is set to defer to HDFS and this case is to verify the
429   * logic, will need to remove the check if the default policy is changed
430   */
431  private void verifyNoHDFSApiInvocationForDefaultPolicy() throws URISyntaxException, IOException {
432    FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem();
433    testFs.initialize(new URI("hdfs://localhost/"), conf);
434    // There should be no exception thrown when setting to default storage policy, which indicates
435    // the HDFS API hasn't been called
436    try {
437      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"),
438        HConstants.DEFAULT_WAL_STORAGE_POLICY, true);
439    } catch (IOException e) {
440      Assert.fail("Should have bypassed the FS API when setting default storage policy");
441    }
442    // There should be exception thrown when given non-default storage policy, which indicates the
443    // HDFS API has been called
444    try {
445      CommonFSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true);
446      Assert.fail("Should have invoked the FS API but haven't");
447    } catch (IOException e) {
448      // expected given an invalid path
449    }
450  }
451
452  class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem {
453    @Override
454    public void setStoragePolicy(final Path src, final String policyName) throws IOException {
455      throw new IOException("The setStoragePolicy method is invoked");
456    }
457  }
458
459  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
460  @Test
461  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
462    verifyFileInDirWithStoragePolicy("ALL_SSD");
463  }
464
465  final String INVALID_STORAGE_POLICY = "1772";
466
467  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
468  @Test
469  public void testSetStoragePolicyInvalid() throws Exception {
470    verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY);
471  }
472
473  // Here instead of TestCommonFSUtils because we need a minicluster
474  private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
475    conf.set(HConstants.WAL_STORAGE_POLICY, policy);
476
477    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
478    try {
479      assertTrue(CommonFSUtils.isHDFS(conf));
480
481      FileSystem fs = FileSystem.get(conf);
482      Path testDir = htu.getDataTestDirOnTestFS("testArchiveFile");
483      fs.mkdirs(testDir);
484
485      String storagePolicy =
486        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
487      CommonFSUtils.setStoragePolicy(fs, testDir, storagePolicy);
488
489      String file = HBaseTestingUtil.getRandomUUID().toString();
490      Path p = new Path(testDir, file);
491      WriteDataToHDFS(fs, p, 4096);
492      HFileSystem hfs = new HFileSystem(fs);
493      String policySet = hfs.getStoragePolicyName(p);
494      LOG.debug("The storage policy of path " + p + " is " + policySet);
495      if (
496        policy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)
497          || policy.equals(INVALID_STORAGE_POLICY)
498      ) {
499        String hdfsDefaultPolicy = hfs.getStoragePolicyName(hfs.getHomeDirectory());
500        LOG.debug("The default hdfs storage policy (indicated by home path: "
501          + hfs.getHomeDirectory() + ") is " + hdfsDefaultPolicy);
502        Assert.assertEquals(hdfsDefaultPolicy, policySet);
503      } else {
504        Assert.assertEquals(policy, policySet);
505      }
506      // will assert existence before deleting.
507      cleanupFile(fs, testDir);
508    } finally {
509      cluster.shutdown();
510    }
511  }
512
513  /**
514   * Ugly test that ensures we can get at the hedged read counters in dfsclient. Does a bit of
515   * preading with hedged reads enabled using code taken from hdfs TestPread.
516   */
517  @Test
518  public void testDFSHedgedReadMetrics() throws Exception {
519    // Enable hedged reads and set it so the threshold is really low.
520    // Most of this test is taken from HDFS, from TestPread.
521    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
522    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
523    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
524    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
525    // Set short retry timeouts so this test runs faster
526    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
527    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
528    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
529    // Get the metrics. Should be empty.
530    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
531    assertEquals(0, metrics.getHedgedReadOps());
532    FileSystem fileSys = cluster.getFileSystem();
533    try {
534      Path p = new Path("preadtest.dat");
535      // We need > 1 blocks to test out the hedged reads.
536      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, blockSize, (short) 3,
537        seed);
538      pReadFile(fileSys, p);
539      cleanupFile(fileSys, p);
540      assertTrue(metrics.getHedgedReadOps() > 0);
541    } finally {
542      fileSys.close();
543      cluster.shutdown();
544    }
545  }
546
547  @Test
548  public void testCopyFilesParallel() throws Exception {
549    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
550    cluster.waitActive();
551    FileSystem fs = cluster.getFileSystem();
552    Path src = new Path("/src");
553    fs.mkdirs(src);
554    for (int i = 0; i < 50; i++) {
555      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
556    }
557    Path sub = new Path(src, "sub");
558    fs.mkdirs(sub);
559    for (int i = 0; i < 50; i++) {
560      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
561    }
562    Path dst = new Path("/dst");
563    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
564
565    assertEquals(102, allFiles.size());
566    FileStatus[] list = fs.listStatus(dst);
567    assertEquals(51, list.length);
568    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
569    assertEquals(50, sublist.length);
570  }
571
572  // Below is taken from TestPread over in HDFS.
573  static final int blockSize = 4096;
574  static final long seed = 0xDEADBEEFL;
575  private Random rand = new Random(); // This test depends on Random#setSeed
576
577  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
578    FSDataInputStream stm = fileSys.open(name);
579    byte[] expected = new byte[12 * blockSize];
580    rand.setSeed(seed);
581    rand.nextBytes(expected);
582    // do a sanity check. Read first 4K bytes
583    byte[] actual = new byte[4096];
584    stm.readFully(actual);
585    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
586    // now do a pread for the first 8K bytes
587    actual = new byte[8192];
588    doPread(stm, 0L, actual, 0, 8192);
589    checkAndEraseData(actual, 0, expected, "Pread Test 1");
590    // Now check to see if the normal read returns 4K-8K byte range
591    actual = new byte[4096];
592    stm.readFully(actual);
593    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
594    // Now see if we can cross a single block boundary successfully
595    // read 4K bytes from blockSize - 2K offset
596    stm.readFully(blockSize - 2048, actual, 0, 4096);
597    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
598    // now see if we can cross two block boundaries successfully
599    // read blockSize + 4K bytes from blockSize - 2K offset
600    actual = new byte[blockSize + 4096];
601    stm.readFully(blockSize - 2048, actual);
602    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
603    // now see if we can cross two block boundaries that are not cached
604    // read blockSize + 4K bytes from 10*blockSize - 2K offset
605    actual = new byte[blockSize + 4096];
606    stm.readFully(10 * blockSize - 2048, actual);
607    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
608    // now check that even after all these preads, we can still read
609    // bytes 8K-12K
610    actual = new byte[4096];
611    stm.readFully(actual);
612    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
613    // done
614    stm.close();
615    // check block location caching
616    stm = fileSys.open(name);
617    stm.readFully(1, actual, 0, 4096);
618    stm.readFully(4 * blockSize, actual, 0, 4096);
619    stm.readFully(7 * blockSize, actual, 0, 4096);
620    actual = new byte[3 * 4096];
621    stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
622    checkAndEraseData(actual, 0, expected, "Pread Test 7");
623    actual = new byte[8 * 4096];
624    stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
625    checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
626    // read the tail
627    stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
628    IOException res = null;
629    try { // read beyond the end of the file
630      stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
631    } catch (IOException e) {
632      // should throw an exception
633      res = e;
634    }
635    assertTrue("Error reading beyond file boundary.", res != null);
636
637    stm.close();
638  }
639
640  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
641    for (int idx = 0; idx < actual.length; idx++) {
642      assertEquals(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx]
643        + " actual " + actual[idx], actual[idx], expected[from + idx]);
644      actual[idx] = 0;
645    }
646  }
647
648  private void doPread(FSDataInputStream stm, long position, byte[] buffer, int offset, int length)
649    throws IOException {
650    int nread = 0;
651    // long totalRead = 0;
652    // DFSInputStream dfstm = null;
653
654    /*
655     * Disable. This counts do not add up. Some issue in original hdfs tests? if
656     * (stm.getWrappedStream() instanceof DFSInputStream) { dfstm = (DFSInputStream)
657     * (stm.getWrappedStream()); totalRead = dfstm.getReadStatistics().getTotalBytesRead(); }
658     */
659
660    while (nread < length) {
661      int nbytes = stm.read(position + nread, buffer, offset + nread, length - nread);
662      assertTrue("Error in pread", nbytes > 0);
663      nread += nbytes;
664    }
665
666    /*
667     * Disable. This counts do not add up. Some issue in original hdfs tests? if (dfstm != null) {
668     * if (isHedgedRead) { assertTrue("Expected read statistic to be incremented", length <=
669     * dfstm.getReadStatistics().getTotalBytesRead() - totalRead); } else {
670     * assertEquals("Expected read statistic to be incremented", length, dfstm
671     * .getReadStatistics().getTotalBytesRead() - totalRead); } }
672     */
673  }
674
675  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
676    assertTrue(fileSys.exists(name));
677    assertTrue(fileSys.delete(name, true));
678    assertTrue(!fileSys.exists(name));
679  }
680
681  static {
682    try {
683      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
684      LOG.debug("Test thought StreamCapabilities class was present.");
685    } catch (ClassNotFoundException exception) {
686      LOG.debug("Test didn't think StreamCapabilities class was present.");
687    }
688  }
689
690  // Here instead of TestCommonFSUtils because we need a minicluster
691  @Test
692  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
693    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
694    try (FileSystem filesystem = cluster.getFileSystem()) {
695      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
696      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
697      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
698      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
699    } finally {
700      cluster.shutdown();
701    }
702  }
703
704  private void testIsSameHdfs(int nnport) throws IOException {
705    Configuration conf = HBaseConfiguration.create();
706    Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
707    Path desPath = new Path("hdfs://127.0.0.1/");
708    FileSystem srcFs = srcPath.getFileSystem(conf);
709    FileSystem desFs = desPath.getFileSystem(conf);
710
711    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
712
713    desPath = new Path("hdfs://127.0.0.1:8070/");
714    desFs = desPath.getFileSystem(conf);
715    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
716
717    desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
718    desFs = desPath.getFileSystem(conf);
719    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
720
721    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
722    conf.set("dfs.nameservices", "haosong-hadoop");
723    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
724    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
725      "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
726
727    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
728    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
729    desPath = new Path("/");
730    desFs = desPath.getFileSystem(conf);
731    assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
732
733    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
734    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
735    desPath = new Path("/");
736    desFs = desPath.getFileSystem(conf);
737    assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
738  }
739
740  @Test
741  public void testIsSameHdfs() throws IOException {
742    String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
743    LOG.info("hadoop version is: " + hadoopVersion);
744    boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
745    if (isHadoop3_0_0) {
746      // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
747      // See HDFS-9427
748      testIsSameHdfs(9820);
749    } else {
750      // pre hadoop 3.0.0 defaults to port 8020
751      // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
752      testIsSameHdfs(8020);
753    }
754  }
755}