001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.LocatedFileStatus;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathFilter;
038import org.apache.hadoop.fs.RemoteIterator;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
047
048/**
049 * Utility methods for interacting with the underlying file system.
050 * <p/>
051 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
052 * pre-commit will run the hbase-server tests if there's code change in this class. See
053 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
054 */
055@InterfaceAudience.Private
056public final class CommonFSUtils {
057  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
058
059  /** Parameter name for HBase WAL directory */
060  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
061
062  /** Parameter to disable stream capability enforcement checks */
063  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
064    "hbase.unsafe.stream.capability.enforce";
065
066  /** Full access permissions (starting point for a umask) */
067  public static final String FULL_RWX_PERMISSIONS = "777";
068
069  private CommonFSUtils() {
070  }
071
072  /**
073   * Compare of path component. Does not consider schema; i.e. if schemas different but
074   * <code>path</code> starts with <code>rootPath</code>, then the function returns true
075   * @param rootPath value to check for
076   * @param path     subject to check
077   * @return True if <code>path</code> starts with <code>rootPath</code>
078   */
079  public static boolean isStartingWithPath(final Path rootPath, final String path) {
080    String uriRootPath = rootPath.toUri().getPath();
081    String tailUriPath = new Path(path).toUri().getPath();
082    return tailUriPath.startsWith(uriRootPath);
083  }
084
085  /**
086   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
087   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
088   * the two will equate.
089   * @param pathToSearch Path we will be trying to match against.
090   * @param pathTail     what to match
091   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
092   */
093  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
094    return isMatchingTail(pathToSearch, new Path(pathTail));
095  }
096
097  /**
098   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
099   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
100   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
101   * @param pathToSearch Path we will be trying to match agains against
102   * @param pathTail     what to match
103   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
104   */
105  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
106    if (pathToSearch.depth() != pathTail.depth()) {
107      return false;
108    }
109    Path tailPath = pathTail;
110    String tailName;
111    Path toSearch = pathToSearch;
112    String toSearchName;
113    boolean result = false;
114    do {
115      tailName = tailPath.getName();
116      if (tailName == null || tailName.length() <= 0) {
117        result = true;
118        break;
119      }
120      toSearchName = toSearch.getName();
121      if (toSearchName == null || toSearchName.length() <= 0) {
122        break;
123      }
124      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
125      tailPath = tailPath.getParent();
126      toSearch = toSearch.getParent();
127    } while (tailName.equals(toSearchName));
128    return result;
129  }
130
131  /**
132   * Delete if exists.
133   * @param fs  filesystem object
134   * @param dir directory to delete
135   * @return True if deleted <code>dir</code>
136   * @throws IOException e
137   */
138  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
139    return fs.exists(dir) && fs.delete(dir, true);
140  }
141
142  /**
143   * Return the number of bytes that large input files should be optimally be split into to minimize
144   * i/o time.
145   * @param fs filesystem object
146   * @return the default block size for the path's filesystem
147   */
148  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
149    return fs.getDefaultBlockSize(path);
150  }
151
152  /*
153   * Get the default replication.
154   * @param fs filesystem object
155   * @param f path of file
156   * @return default replication for the path's filesystem
157   */
158  public static short getDefaultReplication(final FileSystem fs, final Path path) {
159    return fs.getDefaultReplication(path);
160  }
161
162  /**
163   * Returns the default buffer size to use during writes. The size of the buffer should probably be
164   * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is
165   * buffered during read and write operations.
166   * @param fs filesystem object
167   * @return default buffer size to use during writes
168   */
169  public static int getDefaultBufferSize(final FileSystem fs) {
170    return fs.getConf().getInt("io.file.buffer.size", 4096);
171  }
172
173  /**
174   * Create the specified file on the filesystem. By default, this will:
175   * <ol>
176   * <li>apply the umask in the configuration (if it is enabled)</li>
177   * <li>use the fs configured buffer size (or 4096 if not set)</li>
178   * <li>use the default replication</li>
179   * <li>use the default block size</li>
180   * <li>not track progress</li>
181   * </ol>
182   * @param fs        {@link FileSystem} on which to write the file
183   * @param path      {@link Path} to the file to write
184   * @param perm      intial permissions
185   * @param overwrite Whether or not the created file should be overwritten.
186   * @return output stream to the created file
187   * @throws IOException if the file cannot be created
188   */
189  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
190    boolean overwrite) throws IOException {
191    if (LOG.isTraceEnabled()) {
192      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
193    }
194    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
195      getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
196  }
197
198  /**
199   * Get the file permissions specified in the configuration, if they are enabled.
200   * @param fs               filesystem that the file will be created on.
201   * @param conf             configuration to read for determining if permissions are enabled and
202   *                         which to use
203   * @param permssionConfKey property key in the configuration to use when finding the permission
204   * @return the permission to use when creating a new file on the fs. If special permissions are
205   *         not specified in the configuration, then the default permissions on the the fs will be
206   *         returned.
207   */
208  public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf,
209    final String permssionConfKey) {
210    boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false);
211
212    if (enablePermissions) {
213      try {
214        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
215        // make sure that we have a mask, if not, go default.
216        String mask = conf.get(permssionConfKey);
217        if (mask == null) {
218          return FsPermission.getFileDefault();
219        }
220        // appy the umask
221        FsPermission umask = new FsPermission(mask);
222        return perm.applyUMask(umask);
223      } catch (IllegalArgumentException e) {
224        LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey)
225          + ", using default file permissions.", e);
226        return FsPermission.getFileDefault();
227      }
228    }
229    return FsPermission.getFileDefault();
230  }
231
232  /**
233   * Verifies root directory path is a valid URI with a scheme
234   * @param root root directory path
235   * @return Passed <code>root</code> argument.
236   * @throws IOException if not a valid URI with a scheme
237   */
238  public static Path validateRootPath(Path root) throws IOException {
239    try {
240      URI rootURI = new URI(root.toString());
241      String scheme = rootURI.getScheme();
242      if (scheme == null) {
243        throw new IOException("Root directory does not have a scheme");
244      }
245      return root;
246    } catch (URISyntaxException e) {
247      throw new IOException("Root directory path is not a valid " + "URI -- check your "
248        + HConstants.HBASE_DIR + " configuration", e);
249    }
250  }
251
252  /**
253   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
254   * path. If it exists, this method removes it and returns the String representation of remaining
255   * relative path.
256   * @param path must not be null
257   * @param conf must not be null
258   * @return String representation of the remaining relative path
259   * @throws IOException from underlying filesystem
260   */
261  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
262    Path root = getWALRootDir(conf);
263    String pathStr = path.toString();
264    // check that the path is absolute... it has the root path in it.
265    if (!pathStr.startsWith(root.toString())) {
266      return pathStr;
267    }
268    // if not, return as it is.
269    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
270  }
271
272  /**
273   * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path'
274   * component of a Path's URI: e.g. If a Path is
275   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
276   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
277   * out a Path without qualifying Filesystem instance.
278   * @param p Filesystem Path whose 'path' component we are to return.
279   * @return Path portion of the Filesystem
280   */
281  public static String getPath(Path p) {
282    return p.toUri().getPath();
283  }
284
285  /**
286   * Get the path for the root data directory
287   * @param c configuration
288   * @return {@link Path} to hbase root directory from configuration as a qualified Path.
289   * @throws IOException e
290   */
291  public static Path getRootDir(final Configuration c) throws IOException {
292    Path p = new Path(c.get(HConstants.HBASE_DIR));
293    FileSystem fs = p.getFileSystem(c);
294    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
295  }
296
297  public static void setRootDir(final Configuration c, final Path root) {
298    c.set(HConstants.HBASE_DIR, root.toString());
299  }
300
301  public static void setFsDefault(final Configuration c, final Path root) {
302    c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
303  }
304
305  public static void setFsDefault(final Configuration c, final String uri) {
306    c.set("fs.defaultFS", uri); // for hadoop 0.21+
307  }
308
309  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
310    Path p = getRootDir(c);
311    return p.getFileSystem(c);
312  }
313
314  /**
315   * Get the path for the root directory for WAL data
316   * @param c configuration
317   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
318   *         configuration as a qualified Path. Defaults to HBase root dir.
319   * @throws IOException e
320   */
321  public static Path getWALRootDir(final Configuration c) throws IOException {
322
323    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
324    if (!isValidWALRootDir(p, c)) {
325      return getRootDir(c);
326    }
327    FileSystem fs = p.getFileSystem(c);
328    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
329  }
330
331  /**
332   * Returns the URI in the string format
333   * @param c configuration
334   * @param p path
335   * @return - the URI's to string format
336   */
337  public static String getDirUri(final Configuration c, Path p) throws IOException {
338    if (p.toUri().getScheme() != null) {
339      return p.toUri().toString();
340    }
341    return null;
342  }
343
344  public static void setWALRootDir(final Configuration c, final Path root) {
345    c.set(HBASE_WAL_DIR, root.toString());
346  }
347
348  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
349    Path p = getWALRootDir(c);
350    FileSystem fs = p.getFileSystem(c);
351    // hadoop-core does fs caching, so need to propagate this if set
352    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
353    if (enforceStreamCapability != null) {
354      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
355    }
356    return fs;
357  }
358
359  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
360    Path rootDir = getRootDir(c);
361    FileSystem fs = walDir.getFileSystem(c);
362    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
363    if (!qualifiedWalDir.equals(rootDir)) {
364      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
365        throw new IllegalStateException("Illegal WAL directory specified. "
366          + "WAL directories are not permitted to be under root directory: rootDir="
367          + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
368      }
369    }
370    return true;
371  }
372
373  /**
374   * Returns the WAL region directory based on the given table name and region name
375   * @param conf              configuration to determine WALRootDir
376   * @param tableName         Table that the region is under
377   * @param encodedRegionName Region name used for creating the final region directory
378   * @return the region directory used to store WALs under the WALRootDir
379   * @throws IOException if there is an exception determining the WALRootDir
380   */
381  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
382    final String encodedRegionName) throws IOException {
383    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
384  }
385
386  /**
387   * Returns the Table directory under the WALRootDir for the specified table name
388   * @param conf      configuration used to get the WALRootDir
389   * @param tableName Table to get the directory for
390   * @return a path to the WAL table directory for the specified table
391   * @throws IOException if there is an exception determining the WALRootDir
392   */
393  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
394    throws IOException {
395    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
396    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
397      tableName.getQualifierAsString());
398  }
399
400  /**
401   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
402   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
403   * @deprecated For compatibility, will be removed in 4.0.0.
404   */
405  @Deprecated
406  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
407    final String encodedRegionName) throws IOException {
408    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
409      tableName.getQualifierAsString());
410    return new Path(wrongTableDir, encodedRegionName);
411  }
412
413  /**
414   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
415   * path rootdir
416   * @param rootdir   qualified path of HBase root directory
417   * @param tableName name of table
418   * @return {@link org.apache.hadoop.fs.Path} for table
419   */
420  public static Path getTableDir(Path rootdir, final TableName tableName) {
421    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
422      tableName.getQualifierAsString());
423  }
424
425  /**
426   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
427   * path rootdir
428   * @param rootdir    qualified path of HBase root directory
429   * @param tableName  name of table
430   * @param regionName The encoded region name
431   * @return {@link org.apache.hadoop.fs.Path} for region
432   */
433  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
434    return new Path(getTableDir(rootdir, tableName), regionName);
435  }
436
437  /**
438   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory
439   * under path rootdir
440   * @param tablePath path of table
441   * @return {@link org.apache.hadoop.fs.Path} for table
442   */
443  public static TableName getTableName(Path tablePath) {
444    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
445  }
446
447  /**
448   * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under
449   * path rootdir
450   * @param rootdir   qualified path of HBase root directory
451   * @param namespace namespace name
452   * @return {@link org.apache.hadoop.fs.Path} for table
453   */
454  public static Path getNamespaceDir(Path rootdir, final String namespace) {
455    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace)));
456  }
457
458  // this mapping means that under a federated FileSystem implementation, we'll
459  // only log the first failure from any of the underlying FileSystems at WARN and all others
460  // will be at DEBUG.
461  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
462
463  /**
464   * Sets storage policy for given path. If the passed path is a directory, we'll set the storage
465   * policy for all files created in the future in said directory. Note that this change in storage
466   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If
467   * we're running on a version of FileSystem that doesn't support the given storage policy (or
468   * storage policies at all), then we'll issue a log message and continue. See
469   * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
470   * @param fs            We only do anything it implements a setStoragePolicy method
471   * @param path          the Path whose storage policy is to be set
472   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
473   *                      org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
474   *                      'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
475   */
476  public static void setStoragePolicy(final FileSystem fs, final Path path,
477    final String storagePolicy) {
478    try {
479      setStoragePolicy(fs, path, storagePolicy, false);
480    } catch (IOException e) {
481      // should never arrive here
482      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
483    }
484  }
485
486  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
487    boolean throwException) throws IOException {
488    if (storagePolicy == null) {
489      if (LOG.isTraceEnabled()) {
490        LOG.trace("We were passed a null storagePolicy, exiting early.");
491      }
492      return;
493    }
494    String trimmedStoragePolicy = storagePolicy.trim();
495    if (trimmedStoragePolicy.isEmpty()) {
496      LOG.trace("We were passed an empty storagePolicy, exiting early.");
497      return;
498    } else {
499      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
500    }
501    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
502      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
503      return;
504    }
505    try {
506      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
507    } catch (IOException e) {
508      LOG.trace("Failed to invoke set storage policy API on FS", e);
509      if (throwException) {
510        throw e;
511      }
512    }
513  }
514
515  /*
516   * All args have been checked and are good. Run the setStoragePolicy invocation.
517   */
518  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
519    final String storagePolicy) throws IOException {
520    Exception toThrow = null;
521
522    try {
523      fs.setStoragePolicy(path, storagePolicy);
524      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
525    } catch (Exception e) {
526      toThrow = e;
527      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
528      // misuse than a runtime problem with HDFS.
529      if (!warningMap.containsKey(fs)) {
530        warningMap.put(fs, true);
531        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". "
532          + "DEBUG log level might have more details.", e);
533      } else if (LOG.isDebugEnabled()) {
534        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
535      }
536
537      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
538      // that throws UnsupportedOperationException
539      if (e instanceof UnsupportedOperationException) {
540        if (LOG.isDebugEnabled()) {
541          LOG.debug("The underlying FileSystem implementation doesn't support "
542            + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 "
543            + "appears to be present in your version of Hadoop. For more information check "
544            + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem "
545            + "specification docs from HADOOP-11981, and/or related documentation from the "
546            + "provider of the underlying FileSystem (its name should appear in the "
547            + "stacktrace that accompanies this message). Note in particular that Hadoop's "
548            + "local filesystem implementation doesn't support storage policies.", e);
549        }
550      }
551    }
552
553    if (toThrow != null) {
554      throw new IOException(toThrow);
555    }
556  }
557
558  /**
559   * Return true if this is a filesystem whose scheme is 'hdfs'.
560   * @throws IOException from underlying FileSystem
561   */
562  public static boolean isHDFS(final Configuration conf) throws IOException {
563    FileSystem fs = FileSystem.get(conf);
564    String scheme = fs.getUri().getScheme();
565    return scheme.equalsIgnoreCase("hdfs");
566  }
567
568  /**
569   * Checks if the given path is the one with 'recovered.edits' dir.
570   * @param path must not be null
571   * @return True if we recovered edits
572   */
573  public static boolean isRecoveredEdits(Path path) {
574    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
575  }
576
577  /**
578   * Returns the filesystem of the hbase rootdir.
579   * @throws IOException from underlying FileSystem
580   */
581  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
582    return getRootDir(conf).getFileSystem(conf);
583  }
584
585  /**
586   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
587   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
588   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible,
589   * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead.
590   * @param fs     file system
591   * @param dir    directory
592   * @param filter path filter
593   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
594   */
595  public static FileStatus[] listStatus(final FileSystem fs, final Path dir,
596    final PathFilter filter) throws IOException {
597    FileStatus[] status = null;
598    try {
599      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
600    } catch (FileNotFoundException fnfe) {
601      // if directory doesn't exist, return null
602      if (LOG.isTraceEnabled()) {
603        LOG.trace("{} doesn't exist", dir);
604      }
605    }
606    if (status == null || status.length < 1) {
607      return null;
608    }
609    return status;
610  }
611
612  /**
613   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates
614   * differences between hadoop versions
615   * @param fs  file system
616   * @param dir directory
617   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
618   */
619  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
620    return listStatus(fs, dir, null);
621  }
622
623  /**
624   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
625   * @param fs  file system
626   * @param dir directory
627   * @return LocatedFileStatus list
628   */
629  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir)
630    throws IOException {
631    List<LocatedFileStatus> status = null;
632    try {
633      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false);
634      while (locatedFileStatusRemoteIterator.hasNext()) {
635        if (status == null) {
636          status = Lists.newArrayList();
637        }
638        status.add(locatedFileStatusRemoteIterator.next());
639      }
640    } catch (FileNotFoundException fnfe) {
641      // if directory doesn't exist, return null
642      if (LOG.isTraceEnabled()) {
643        LOG.trace("{} doesn't exist", dir);
644      }
645    }
646    return status;
647  }
648
649  /**
650   * Calls fs.delete() and returns the value returned by the fs.delete()
651   * @param fs        must not be null
652   * @param path      must not be null
653   * @param recursive delete tree rooted at path
654   * @return the value returned by the fs.delete()
655   * @throws IOException from underlying FileSystem
656   */
657  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
658    throws IOException {
659    return fs.delete(path, recursive);
660  }
661
662  /**
663   * Calls fs.exists(). Checks if the specified path exists
664   * @param fs   must not be null
665   * @param path must not be null
666   * @return the value returned by fs.exists()
667   * @throws IOException from underlying FileSystem
668   */
669  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
670    return fs.exists(path);
671  }
672
673  /**
674   * Log the current state of the filesystem from a certain root directory
675   * @param fs   filesystem to investigate
676   * @param root root file/directory to start logging from
677   * @param log  log to output information
678   * @throws IOException if an unexpected exception occurs
679   */
680  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
681    throws IOException {
682    log.debug("File system contents for path {}", root);
683    logFSTree(log, fs, root, "|-");
684  }
685
686  /**
687   * Recursive helper to log the state of the FS
688   * @see #logFileSystemState(FileSystem, Path, Logger)
689   */
690  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
691    throws IOException {
692    FileStatus[] files = listStatus(fs, root, null);
693    if (files == null) {
694      return;
695    }
696
697    for (FileStatus file : files) {
698      if (file.isDirectory()) {
699        log.debug(prefix + file.getPath().getName() + "/");
700        logFSTree(log, fs, file.getPath(), prefix + "---");
701      } else {
702        log.debug(prefix + file.getPath().getName());
703      }
704    }
705  }
706
707  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
708    throws IOException {
709    // set the modify time for TimeToLive Cleaner
710    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
711    return fs.rename(src, dest);
712  }
713
714  /**
715   * Check if short circuit read buffer size is set and if not, set it to hbase value.
716   * @param conf must not be null
717   */
718  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
719    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
720    final int notSet = -1;
721    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
722    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
723    int size = conf.getInt(dfsKey, notSet);
724    // If a size is set, return -- we will use it.
725    if (size != notSet) {
726      return;
727    }
728    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
729    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
730    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
731  }
732
733  private static final class DfsBuilderUtility {
734    private static final Class<?> BUILDER;
735    private static final Method REPLICATE;
736    private static final Method NO_LOCAL_WRITE;
737
738    static {
739      String builderName =
740        "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
741      Class<?> builderClass = null;
742      try {
743        builderClass = Class.forName(builderName);
744      } catch (ClassNotFoundException e) {
745        LOG.debug("{} not available, will not set replicate when creating output stream",
746          builderName);
747      }
748      Method replicateMethod = null;
749      if (builderClass != null) {
750        try {
751          replicateMethod = builderClass.getMethod("replicate");
752          LOG.debug("Using builder API via reflection for DFS file creation replicate flag.");
753        } catch (NoSuchMethodException e) {
754          LOG.debug("Could not find replicate method on builder; will not set replicate when"
755            + " creating output stream", e);
756        }
757      }
758      Method noLocalWriteMethod = null;
759      if (builderClass != null) {
760        try {
761          noLocalWriteMethod = builderClass.getMethod("noLocalWrite");
762          LOG.debug("Using builder API via reflection for DFS file creation noLocalWrite flag.");
763        } catch (NoSuchMethodException e) {
764          LOG.debug("Could not find noLocalWrite method on builder; will not set noLocalWrite when"
765            + " creating output stream", e);
766        }
767      }
768      BUILDER = builderClass;
769      REPLICATE = replicateMethod;
770      NO_LOCAL_WRITE = noLocalWriteMethod;
771    }
772
773    /**
774     * Attempt to use builder API via reflection to call the replicate method on the given builder.
775     */
776    static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
777      if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
778        try {
779          REPLICATE.invoke(builder);
780        } catch (IllegalAccessException | InvocationTargetException e) {
781          // Should have caught this failure during initialization, so log full trace here
782          LOG.warn("Couldn't use reflection with builder API", e);
783        }
784      }
785    }
786
787    static void noLocalWrite(FSDataOutputStreamBuilder<?, ?> builder) {
788      if (
789        BUILDER != null && NO_LOCAL_WRITE != null && BUILDER.isAssignableFrom(builder.getClass())
790      ) {
791        try {
792          NO_LOCAL_WRITE.invoke(builder);
793        } catch (IllegalAccessException | InvocationTargetException e) {
794          // Should have caught this failure during initialization, so log full trace here
795          LOG.warn("Couldn't use reflection with builder API", e);
796        }
797      }
798    }
799  }
800
801  /**
802   * Attempt to use builder API via reflection to create a file with the given parameters and
803   * replication enabled.
804   * <p/>
805   * Will not attempt to enable replication when passed an HFileSystem.
806   */
807  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
808    throws IOException {
809    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
810    DfsBuilderUtility.replicate(builder);
811    return builder.build();
812  }
813
814  /**
815   * Attempt to use builder API via reflection to create a file with the given parameters and
816   * replication enabled.
817   * <p/>
818   * Will not attempt to enable replication when passed an HFileSystem.
819   */
820  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
821    int bufferSize, short replication, long blockSize, boolean noLocalWrite, boolean isRecursive)
822    throws IOException {
823    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
824      .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
825    if (isRecursive) {
826      builder.recursive();
827    }
828    DfsBuilderUtility.replicate(builder);
829    if (noLocalWrite) {
830      DfsBuilderUtility.noLocalWrite(builder);
831    }
832    return builder.build();
833  }
834
835  /**
836   * Helper exception for those cases where the place where we need to check a stream capability is
837   * not where we have the needed context to explain the impact and mitigation for a lack.
838   */
839  public static class StreamLacksCapabilityException extends Exception {
840    public StreamLacksCapabilityException(String message, Throwable cause) {
841      super(message, cause);
842    }
843
844    public StreamLacksCapabilityException(String message) {
845      super(message);
846    }
847  }
848}