001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.fs.CommonPathCapabilities.FS_STORAGEPOLICY;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.LocatedFileStatus;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathFilter;
037import org.apache.hadoop.fs.RemoteIterator;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
046
047/**
048 * Utility methods for interacting with the underlying file system.
049 * <p/>
050 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
051 * pre-commit will run the hbase-server tests if there's code change in this class. See
052 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
053 */
054@InterfaceAudience.Private
055public final class CommonFSUtils {
056  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
057
058  /** Parameter name for HBase WAL directory */
059  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
060
061  /** Parameter to disable stream capability enforcement checks */
062  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
063    "hbase.unsafe.stream.capability.enforce";
064
065  /** Full access permissions (starting point for a umask) */
066  public static final String FULL_RWX_PERMISSIONS = "777";
067
068  private CommonFSUtils() {
069  }
070
071  /**
072   * Compare of path component. Does not consider schema; i.e. if schemas different but
073   * <code>path</code> starts with <code>rootPath</code>, then the function returns true
074   * @param rootPath value to check for
075   * @param path     subject to check
076   * @return True if <code>path</code> starts with <code>rootPath</code>
077   */
078  public static boolean isStartingWithPath(final Path rootPath, final String path) {
079    String uriRootPath = rootPath.toUri().getPath();
080    String tailUriPath = new Path(path).toUri().getPath();
081    return tailUriPath.startsWith(uriRootPath);
082  }
083
084  /**
085   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
086   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
087   * the two will equate.
088   * @param pathToSearch Path we will be trying to match against.
089   * @param pathTail     what to match
090   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
091   */
092  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
093    return isMatchingTail(pathToSearch, new Path(pathTail));
094  }
095
096  /**
097   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
098   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
099   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
100   * @param pathToSearch Path we will be trying to match agains against
101   * @param pathTail     what to match
102   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
103   */
104  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
105    if (pathToSearch.depth() != pathTail.depth()) {
106      return false;
107    }
108    Path tailPath = pathTail;
109    String tailName;
110    Path toSearch = pathToSearch;
111    String toSearchName;
112    boolean result = false;
113    do {
114      tailName = tailPath.getName();
115      if (tailName == null || tailName.length() <= 0) {
116        result = true;
117        break;
118      }
119      toSearchName = toSearch.getName();
120      if (toSearchName == null || toSearchName.length() <= 0) {
121        break;
122      }
123      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
124      tailPath = tailPath.getParent();
125      toSearch = toSearch.getParent();
126    } while (tailName.equals(toSearchName));
127    return result;
128  }
129
130  /**
131   * Delete if exists.
132   * @param fs  filesystem object
133   * @param dir directory to delete
134   * @return True if deleted <code>dir</code>
135   * @throws IOException e
136   */
137  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
138    return fs.exists(dir) && fs.delete(dir, true);
139  }
140
141  /**
142   * Return the number of bytes that large input files should be optimally be split into to minimize
143   * i/o time.
144   * @param fs filesystem object
145   * @return the default block size for the path's filesystem
146   */
147  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
148    return fs.getDefaultBlockSize(path);
149  }
150
151  /*
152   * Get the default replication.
153   * @param fs filesystem object
154   * @param f path of file
155   * @return default replication for the path's filesystem
156   */
157  public static short getDefaultReplication(final FileSystem fs, final Path path) {
158    return fs.getDefaultReplication(path);
159  }
160
161  /**
162   * Returns the default buffer size to use during writes. The size of the buffer should probably be
163   * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is
164   * buffered during read and write operations.
165   * @param fs filesystem object
166   * @return default buffer size to use during writes
167   */
168  public static int getDefaultBufferSize(final FileSystem fs) {
169    return fs.getConf().getInt("io.file.buffer.size", 4096);
170  }
171
172  /**
173   * Create the specified file on the filesystem. By default, this will:
174   * <ol>
175   * <li>apply the umask in the configuration (if it is enabled)</li>
176   * <li>use the fs configured buffer size (or 4096 if not set)</li>
177   * <li>use the default replication</li>
178   * <li>use the default block size</li>
179   * <li>not track progress</li>
180   * </ol>
181   * @param fs        {@link FileSystem} on which to write the file
182   * @param path      {@link Path} to the file to write
183   * @param perm      intial permissions
184   * @param overwrite Whether or not the created file should be overwritten.
185   * @return output stream to the created file
186   * @throws IOException if the file cannot be created
187   */
188  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
189    boolean overwrite) throws IOException {
190    if (LOG.isTraceEnabled()) {
191      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
192    }
193    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
194      getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
195  }
196
197  /**
198   * Get the file permissions specified in the configuration, if they are enabled.
199   * @param fs               filesystem that the file will be created on.
200   * @param conf             configuration to read for determining if permissions are enabled and
201   *                         which to use
202   * @param permssionConfKey property key in the configuration to use when finding the permission
203   * @return the permission to use when creating a new file on the fs. If special permissions are
204   *         not specified in the configuration, then the default permissions on the the fs will be
205   *         returned.
206   */
207  public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf,
208    final String permssionConfKey) {
209    boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false);
210
211    if (enablePermissions) {
212      try {
213        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
214        // make sure that we have a mask, if not, go default.
215        String mask = conf.get(permssionConfKey);
216        if (mask == null) {
217          return FsPermission.getFileDefault();
218        }
219        // appy the umask
220        FsPermission umask = new FsPermission(mask);
221        return perm.applyUMask(umask);
222      } catch (IllegalArgumentException e) {
223        LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey)
224          + ", using default file permissions.", e);
225        return FsPermission.getFileDefault();
226      }
227    }
228    return FsPermission.getFileDefault();
229  }
230
231  /**
232   * Verifies root directory path is a valid URI with a scheme
233   * @param root root directory path
234   * @return Passed <code>root</code> argument.
235   * @throws IOException if not a valid URI with a scheme
236   */
237  public static Path validateRootPath(Path root) throws IOException {
238    try {
239      URI rootURI = new URI(root.toString());
240      String scheme = rootURI.getScheme();
241      if (scheme == null) {
242        throw new IOException("Root directory does not have a scheme");
243      }
244      return root;
245    } catch (URISyntaxException e) {
246      throw new IOException("Root directory path is not a valid " + "URI -- check your "
247        + HConstants.HBASE_DIR + " configuration", e);
248    }
249  }
250
251  /**
252   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
253   * path. If it exists, this method removes it and returns the String representation of remaining
254   * relative path.
255   * @param path must not be null
256   * @param conf must not be null
257   * @return String representation of the remaining relative path
258   * @throws IOException from underlying filesystem
259   */
260  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
261    Path root = getWALRootDir(conf);
262    String pathStr = path.toString();
263    // check that the path is absolute... it has the root path in it.
264    if (!pathStr.startsWith(root.toString())) {
265      return pathStr;
266    }
267    // if not, return as it is.
268    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
269  }
270
271  /**
272   * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path'
273   * component of a Path's URI: e.g. If a Path is
274   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
275   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
276   * out a Path without qualifying Filesystem instance.
277   * @param p Filesystem Path whose 'path' component we are to return.
278   * @return Path portion of the Filesystem
279   */
280  public static String getPath(Path p) {
281    return p.toUri().getPath();
282  }
283
284  /**
285   * Get the path for the root data directory
286   * @param c configuration
287   * @return {@link Path} to hbase root directory from configuration as a qualified Path.
288   * @throws IOException e
289   */
290  public static Path getRootDir(final Configuration c) throws IOException {
291    Path p = new Path(c.get(HConstants.HBASE_DIR));
292    FileSystem fs = p.getFileSystem(c);
293    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
294  }
295
296  public static void setRootDir(final Configuration c, final Path root) {
297    c.set(HConstants.HBASE_DIR, root.toString());
298  }
299
300  public static void setFsDefault(final Configuration c, final Path root) {
301    c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
302  }
303
304  public static void setFsDefault(final Configuration c, final String uri) {
305    c.set("fs.defaultFS", uri); // for hadoop 0.21+
306  }
307
308  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
309    Path p = getRootDir(c);
310    return p.getFileSystem(c);
311  }
312
313  /**
314   * Get the path for the root directory for WAL data
315   * @param c configuration
316   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
317   *         configuration as a qualified Path. Defaults to HBase root dir.
318   * @throws IOException e
319   */
320  public static Path getWALRootDir(final Configuration c) throws IOException {
321    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
322    if (!isValidWALRootDir(p, c)) {
323      return getRootDir(c);
324    }
325    FileSystem fs = p.getFileSystem(c);
326    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
327  }
328
329  /**
330   * Returns the URI in the string format
331   * @param c configuration
332   * @param p path
333   * @return - the URI's to string format
334   */
335  public static String getDirUri(final Configuration c, Path p) throws IOException {
336    if (p.toUri().getScheme() != null) {
337      return p.toUri().toString();
338    }
339    return null;
340  }
341
342  public static void setWALRootDir(final Configuration c, final Path root) {
343    c.set(HBASE_WAL_DIR, root.toString());
344  }
345
346  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
347    Path p = getWALRootDir(c);
348    FileSystem fs = p.getFileSystem(c);
349    // hadoop-core does fs caching, so need to propagate this if set
350    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
351    if (enforceStreamCapability != null) {
352      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
353    }
354    return fs;
355  }
356
357  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
358    Path rootDir = getRootDir(c);
359    FileSystem fs = walDir.getFileSystem(c);
360    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
361    if (!qualifiedWalDir.equals(rootDir)) {
362      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
363        throw new IllegalStateException("Illegal WAL directory specified. "
364          + "WAL directories are not permitted to be under root directory: rootDir="
365          + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
366      }
367    }
368    return true;
369  }
370
371  /**
372   * Returns the WAL region directory based on the given table name and region name
373   * @param conf              configuration to determine WALRootDir
374   * @param tableName         Table that the region is under
375   * @param encodedRegionName Region name used for creating the final region directory
376   * @return the region directory used to store WALs under the WALRootDir
377   * @throws IOException if there is an exception determining the WALRootDir
378   */
379  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
380    final String encodedRegionName) throws IOException {
381    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
382  }
383
384  /**
385   * Returns the Table directory under the WALRootDir for the specified table name
386   * @param conf      configuration used to get the WALRootDir
387   * @param tableName Table to get the directory for
388   * @return a path to the WAL table directory for the specified table
389   * @throws IOException if there is an exception determining the WALRootDir
390   */
391  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
392    throws IOException {
393    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
394    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
395      tableName.getQualifierAsString());
396  }
397
398  /**
399   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
400   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
401   * @deprecated For compatibility, will be removed in 4.0.0.
402   */
403  @Deprecated
404  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
405    final String encodedRegionName) throws IOException {
406    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
407      tableName.getQualifierAsString());
408    return new Path(wrongTableDir, encodedRegionName);
409  }
410
411  /**
412   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
413   * path rootdir
414   * @param rootdir   qualified path of HBase root directory
415   * @param tableName name of table
416   * @return {@link org.apache.hadoop.fs.Path} for table
417   */
418  public static Path getTableDir(Path rootdir, final TableName tableName) {
419    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
420      tableName.getQualifierAsString());
421  }
422
423  /**
424   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
425   * path rootdir
426   * @param rootdir    qualified path of HBase root directory
427   * @param tableName  name of table
428   * @param regionName The encoded region name
429   * @return {@link org.apache.hadoop.fs.Path} for region
430   */
431  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
432    return new Path(getTableDir(rootdir, tableName), regionName);
433  }
434
435  /**
436   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory
437   * under path rootdir
438   * @param tablePath path of table
439   * @return {@link org.apache.hadoop.fs.Path} for table
440   */
441  public static TableName getTableName(Path tablePath) {
442    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
443  }
444
445  /**
446   * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under
447   * path rootdir
448   * @param rootdir   qualified path of HBase root directory
449   * @param namespace namespace name
450   * @return {@link org.apache.hadoop.fs.Path} for table
451   */
452  public static Path getNamespaceDir(Path rootdir, final String namespace) {
453    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace)));
454  }
455
456  // this mapping means that under a federated FileSystem implementation, we'll
457  // only log the first failure from any of the underlying FileSystems at WARN and all others
458  // will be at DEBUG.
459  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
460
461  /**
462   * Sets storage policy for given path. If the passed path is a directory, we'll set the storage
463   * policy for all files created in the future in said directory. Note that this change in storage
464   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If
465   * we're running on a version of FileSystem that doesn't support the given storage policy (or
466   * storage policies at all), then we'll issue a log message and continue. See
467   * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
468   * @param fs            We only do anything it implements a setStoragePolicy method
469   * @param path          the Path whose storage policy is to be set
470   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
471   *                      org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
472   *                      'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
473   */
474  public static void setStoragePolicy(final FileSystem fs, final Path path,
475    final String storagePolicy) {
476    try {
477      setStoragePolicy(fs, path, storagePolicy, false);
478    } catch (IOException e) {
479      // should never arrive here
480      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
481    }
482  }
483
484  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
485    boolean throwException) throws IOException {
486    if (storagePolicy == null) {
487      if (LOG.isTraceEnabled()) {
488        LOG.trace("We were passed a null storagePolicy, exiting early.");
489      }
490      return;
491    }
492    String trimmedStoragePolicy = storagePolicy.trim();
493    if (trimmedStoragePolicy.isEmpty()) {
494      LOG.trace("We were passed an empty storagePolicy, exiting early.");
495      return;
496    } else {
497      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
498    }
499    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
500      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
501      return;
502    }
503    try {
504      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
505    } catch (IOException e) {
506      LOG.trace("Failed to invoke set storage policy API on FS", e);
507      if (throwException) {
508        throw e;
509      }
510    }
511  }
512
513  /*
514   * All args have been checked and are good. Run the setStoragePolicy invocation.
515   */
516  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
517    final String storagePolicy) throws IOException {
518    Exception toThrow = null;
519
520    if (!fs.hasPathCapability(path, FS_STORAGEPOLICY)) {
521      LOG.debug("The file system does not support storage policy.");
522      return;
523    }
524
525    try {
526      fs.setStoragePolicy(path, storagePolicy);
527      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
528    } catch (Exception e) {
529      toThrow = e;
530      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
531      // misuse than a runtime problem with HDFS.
532      if (!warningMap.containsKey(fs)) {
533        warningMap.put(fs, true);
534        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". "
535          + "DEBUG log level might have more details.", e);
536      } else if (LOG.isDebugEnabled()) {
537        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
538      }
539
540      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
541      // that throws UnsupportedOperationException
542      if (e instanceof UnsupportedOperationException) {
543        if (LOG.isDebugEnabled()) {
544          LOG.debug("The underlying FileSystem implementation doesn't support "
545            + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 "
546            + "appears to be present in your version of Hadoop. For more information check "
547            + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem "
548            + "specification docs from HADOOP-11981, and/or related documentation from the "
549            + "provider of the underlying FileSystem (its name should appear in the "
550            + "stacktrace that accompanies this message). Note in particular that Hadoop's "
551            + "local filesystem implementation doesn't support storage policies.", e);
552        }
553      }
554    }
555
556    if (toThrow != null) {
557      throw new IOException(toThrow);
558    }
559  }
560
561  /**
562   * Return true if this is a filesystem whose scheme is 'hdfs'.
563   * @throws IOException from underlying FileSystem
564   */
565  public static boolean isHDFS(final Configuration conf) throws IOException {
566    FileSystem fs = FileSystem.get(conf);
567    String scheme = fs.getUri().getScheme();
568    return scheme.equalsIgnoreCase("hdfs");
569  }
570
571  /**
572   * Checks if the given path is the one with 'recovered.edits' dir.
573   * @param path must not be null
574   * @return True if we recovered edits
575   */
576  public static boolean isRecoveredEdits(Path path) {
577    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
578  }
579
580  /**
581   * Returns the filesystem of the hbase rootdir.
582   * @throws IOException from underlying FileSystem
583   */
584  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
585    return getRootDir(conf).getFileSystem(conf);
586  }
587
588  /**
589   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
590   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
591   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible,
592   * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead.
593   * @param fs     file system
594   * @param dir    directory
595   * @param filter path filter
596   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
597   */
598  public static FileStatus[] listStatus(final FileSystem fs, final Path dir,
599    final PathFilter filter) throws IOException {
600    FileStatus[] status = null;
601    try {
602      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
603    } catch (FileNotFoundException fnfe) {
604      // if directory doesn't exist, return null
605      if (LOG.isTraceEnabled()) {
606        LOG.trace("{} doesn't exist", dir);
607      }
608    }
609    if (status == null || status.length < 1) {
610      return null;
611    }
612    return status;
613  }
614
615  /**
616   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates
617   * differences between hadoop versions
618   * @param fs  file system
619   * @param dir directory
620   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
621   */
622  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
623    return listStatus(fs, dir, null);
624  }
625
626  /**
627   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
628   * @param fs  file system
629   * @param dir directory
630   * @return LocatedFileStatus list
631   */
632  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir)
633    throws IOException {
634    List<LocatedFileStatus> status = null;
635    try {
636      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false);
637      while (locatedFileStatusRemoteIterator.hasNext()) {
638        if (status == null) {
639          status = Lists.newArrayList();
640        }
641        status.add(locatedFileStatusRemoteIterator.next());
642      }
643    } catch (FileNotFoundException fnfe) {
644      // if directory doesn't exist, return null
645      if (LOG.isTraceEnabled()) {
646        LOG.trace("{} doesn't exist", dir);
647      }
648    }
649    return status;
650  }
651
652  /**
653   * Calls fs.delete() and returns the value returned by the fs.delete()
654   * @param fs        must not be null
655   * @param path      must not be null
656   * @param recursive delete tree rooted at path
657   * @return the value returned by the fs.delete()
658   * @throws IOException from underlying FileSystem
659   */
660  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
661    throws IOException {
662    return fs.delete(path, recursive);
663  }
664
665  /**
666   * Calls fs.exists(). Checks if the specified path exists
667   * @param fs   must not be null
668   * @param path must not be null
669   * @return the value returned by fs.exists()
670   * @throws IOException from underlying FileSystem
671   */
672  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
673    return fs.exists(path);
674  }
675
676  /**
677   * Log the current state of the filesystem from a certain root directory
678   * @param fs   filesystem to investigate
679   * @param root root file/directory to start logging from
680   * @param log  log to output information
681   * @throws IOException if an unexpected exception occurs
682   */
683  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
684    throws IOException {
685    log.debug("File system contents for path {}", root);
686    logFSTree(log, fs, root, "|-");
687  }
688
689  /**
690   * Recursive helper to log the state of the FS
691   * @see #logFileSystemState(FileSystem, Path, Logger)
692   */
693  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
694    throws IOException {
695    FileStatus[] files = listStatus(fs, root, null);
696    if (files == null) {
697      return;
698    }
699
700    for (FileStatus file : files) {
701      if (file.isDirectory()) {
702        log.debug(prefix + file.getPath().getName() + "/");
703        logFSTree(log, fs, file.getPath(), prefix + "---");
704      } else {
705        log.debug(prefix + file.getPath().getName());
706      }
707    }
708  }
709
710  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
711    throws IOException {
712    // set the modify time for TimeToLive Cleaner
713    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
714    return fs.rename(src, dest);
715  }
716
717  /**
718   * Check if short circuit read buffer size is set and if not, set it to hbase value.
719   * @param conf must not be null
720   */
721  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
722    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
723    final int notSet = -1;
724    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
725    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
726    int size = conf.getInt(dfsKey, notSet);
727    // If a size is set, return -- we will use it.
728    if (size != notSet) {
729      return;
730    }
731    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
732    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
733    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
734  }
735
736  /**
737   * Helper exception for those cases where the place where we need to check a stream capability is
738   * not where we have the needed context to explain the impact and mitigation for a lack.
739   */
740  public static class StreamLacksCapabilityException extends Exception {
741    public StreamLacksCapabilityException(String message, Throwable cause) {
742      super(message, cause);
743    }
744
745    public StreamLacksCapabilityException(String message) {
746      super(message);
747    }
748  }
749}