001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
022
023import edu.umd.cs.findbugs.annotations.CheckForNull;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.EOFException;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.net.InetSocketAddress;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Locale;
043import java.util.Map;
044import java.util.Set;
045import java.util.Vector;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.FutureTask;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.regex.Pattern;
055import org.apache.commons.lang3.ArrayUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.BlockLocation;
058import org.apache.hadoop.fs.FSDataInputStream;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FileUtil;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.fs.StorageType;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.ClusterId;
068import org.apache.hadoop.hbase.HColumnDescriptor;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.HDFSBlocksDistribution;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.exceptions.DeserializationException;
076import org.apache.hadoop.hbase.fs.HFileSystem;
077import org.apache.hadoop.hbase.io.HFileLink;
078import org.apache.hadoop.hbase.master.HMaster;
079import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
080import org.apache.hadoop.hdfs.DFSClient;
081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
082import org.apache.hadoop.hdfs.DFSUtil;
083import org.apache.hadoop.hdfs.DistributedFileSystem;
084import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
085import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
086import org.apache.hadoop.hdfs.protocol.LocatedBlock;
087import org.apache.hadoop.io.IOUtils;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.hadoop.util.StringUtils;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
096import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
102
103/**
104 * Utility methods for interacting with the underlying file system.
105 */
106@InterfaceAudience.Private
107public final class FSUtils {
108  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
109
110  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
111  private static final int DEFAULT_THREAD_POOLSIZE = 2;
112
113  /** Set to true on Windows platforms */
114  // currently only used in testing. TODO refactor into a test class
115  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
116
117  private FSUtils() {
118  }
119
120  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
121  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
122    FileSystem fileSystem = fs;
123    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
124    // Check its backing fs for dfs-ness.
125    if (fs instanceof HFileSystem) {
126      fileSystem = ((HFileSystem) fs).getBackingFs();
127    }
128    return fileSystem instanceof DistributedFileSystem;
129  }
130
131  /**
132   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
133   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
134   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
135   * @param pathToSearch Path we will be trying to match.
136   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    Path tailPath = pathTail;
140    String tailName;
141    Path toSearch = pathToSearch;
142    String toSearchName;
143    boolean result = false;
144
145    if (pathToSearch.depth() != pathTail.depth()) {
146      return false;
147    }
148
149    do {
150      tailName = tailPath.getName();
151      if (tailName == null || tailName.isEmpty()) {
152        result = true;
153        break;
154      }
155      toSearchName = toSearch.getName();
156      if (toSearchName == null || toSearchName.isEmpty()) {
157        break;
158      }
159      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
160      tailPath = tailPath.getParent();
161      toSearch = toSearch.getParent();
162    } while (tailName.equals(toSearchName));
163    return result;
164  }
165
166  /**
167   * Delete the region directory if exists.
168   * @return True if deleted the region directory.
169   */
170  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
171    throws IOException {
172    Path rootDir = CommonFSUtils.getRootDir(conf);
173    FileSystem fs = rootDir.getFileSystem(conf);
174    return CommonFSUtils.deleteDirectory(fs,
175      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
176  }
177
178  /**
179   * Create the specified file on the filesystem. By default, this will:
180   * <ol>
181   * <li>overwrite the file if it exists</li>
182   * <li>apply the umask in the configuration (if it is enabled)</li>
183   * <li>use the fs configured buffer size (or 4096 if not set)</li>
184   * <li>use the configured column family replication or default replication if
185   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
186   * <li>use the default block size</li>
187   * <li>not track progress</li>
188   * </ol>
189   * @param conf         configurations
190   * @param fs           {@link FileSystem} on which to write the file
191   * @param path         {@link Path} to the file to write
192   * @param perm         permissions
193   * @param favoredNodes favored data nodes
194   * @return output stream to the created file
195   * @throws IOException if the file cannot be created
196   */
197  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
198    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
199    if (fs instanceof HFileSystem) {
200      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
201      if (backingFs instanceof DistributedFileSystem) {
202        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
203          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
204        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
205          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
206            .create();
207        if (favoredNodes != null) {
208          builder.favoredNodes(favoredNodes);
209        }
210        if (replication > 0) {
211          builder.replication(replication);
212        }
213        return builder.build();
214      }
215
216    }
217    return CommonFSUtils.create(fs, path, perm, true);
218  }
219
220  /**
221   * Checks to see if the specified file system is available
222   * @param fs filesystem
223   * @throws IOException e
224   */
225  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
226    if (!(fs instanceof DistributedFileSystem)) {
227      return;
228    }
229    IOException exception = null;
230    DistributedFileSystem dfs = (DistributedFileSystem) fs;
231    try {
232      if (dfs.exists(new Path("/"))) {
233        return;
234      }
235    } catch (IOException e) {
236      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
237    }
238    try {
239      fs.close();
240    } catch (Exception e) {
241      LOG.error("file system close failed: ", e);
242    }
243    throw new IOException("File system is not available", exception);
244  }
245
246  /**
247   * Inquire the Active NameNode's safe mode status.
248   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
249   * @return whether we're in safe mode
250   */
251  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
252    return dfs.setSafeMode(SAFEMODE_GET, true);
253  }
254
255  /**
256   * Check whether dfs is in safemode.
257   */
258  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
259    boolean isInSafeMode = false;
260    FileSystem fs = FileSystem.get(conf);
261    if (fs instanceof DistributedFileSystem) {
262      DistributedFileSystem dfs = (DistributedFileSystem) fs;
263      isInSafeMode = isInSafeMode(dfs);
264    }
265    if (isInSafeMode) {
266      throw new IOException("File system is in safemode, it can't be written now");
267    }
268  }
269
270  /**
271   * Verifies current version of file system
272   * @param fs      filesystem object
273   * @param rootdir root hbase directory
274   * @return null if no version file exists, version string otherwise
275   * @throws IOException              if the version file fails to open
276   * @throws DeserializationException if the version data cannot be translated into a version
277   */
278  public static String getVersion(FileSystem fs, Path rootdir)
279    throws IOException, DeserializationException {
280    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
281    FileStatus[] status = null;
282    try {
283      // hadoop 2.0 throws FNFE if directory does not exist.
284      // hadoop 1.0 returns null if directory does not exist.
285      status = fs.listStatus(versionFile);
286    } catch (FileNotFoundException fnfe) {
287      return null;
288    }
289    if (ArrayUtils.getLength(status) == 0) {
290      return null;
291    }
292    String version = null;
293    byte[] content = new byte[(int) status[0].getLen()];
294    FSDataInputStream s = fs.open(versionFile);
295    try {
296      IOUtils.readFully(s, content, 0, content.length);
297      if (ProtobufUtil.isPBMagicPrefix(content)) {
298        version = parseVersionFrom(content);
299      } else {
300        // Presume it pre-pb format.
301        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
302          version = dis.readUTF();
303        }
304      }
305    } catch (EOFException eof) {
306      LOG.warn("Version file was empty, odd, will try to set it.");
307    } finally {
308      s.close();
309    }
310    return version;
311  }
312
313  /**
314   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
315   * @param bytes The byte content of the hbase.version file
316   * @return The version found in the file as a String
317   * @throws DeserializationException if the version data cannot be translated into a version
318   */
319  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
320    ProtobufUtil.expectPBMagicPrefix(bytes);
321    int pblen = ProtobufUtil.lengthOfPBMagic();
322    FSProtos.HBaseVersionFileContent.Builder builder =
323      FSProtos.HBaseVersionFileContent.newBuilder();
324    try {
325      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
326      return builder.getVersion();
327    } catch (IOException e) {
328      // Convert
329      throw new DeserializationException(e);
330    }
331  }
332
333  /**
334   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
335   * @param version Version to persist
336   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
337   *         prefix.
338   */
339  static byte[] toVersionByteArray(final String version) {
340    FSProtos.HBaseVersionFileContent.Builder builder =
341      FSProtos.HBaseVersionFileContent.newBuilder();
342    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
343  }
344
345  /**
346   * Verifies current version of file system
347   * @param fs      file system
348   * @param rootdir root directory of HBase installation
349   * @param message if true, issues a message on System.out
350   * @throws IOException              if the version file cannot be opened
351   * @throws DeserializationException if the contents of the version file cannot be parsed
352   */
353  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
354    throws IOException, DeserializationException {
355    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
356  }
357
358  /**
359   * Verifies current version of file system
360   * @param fs      file system
361   * @param rootdir root directory of HBase installation
362   * @param message if true, issues a message on System.out
363   * @param wait    wait interval
364   * @param retries number of times to retry
365   * @throws IOException              if the version file cannot be opened
366   * @throws DeserializationException if the contents of the version file cannot be parsed
367   */
368  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
369    int retries) throws IOException, DeserializationException {
370    String version = getVersion(fs, rootdir);
371    String msg;
372    if (version == null) {
373      if (!metaRegionExists(fs, rootdir)) {
374        // rootDir is empty (no version file and no root region)
375        // just create new version file (HBASE-1195)
376        setVersion(fs, rootdir, wait, retries);
377        return;
378      } else {
379        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
380          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
381          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
382      }
383    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
384      return;
385    } else {
386      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
387        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
388        + ". Consult http://hbase.apache.org/book.html for further information about "
389        + "upgrading HBase.";
390    }
391
392    // version is deprecated require migration
393    // Output on stdout so user sees it in terminal.
394    if (message) {
395      System.out.println("WARNING! " + msg);
396    }
397    throw new FileSystemVersionException(msg);
398  }
399
400  /**
401   * Sets version of file system
402   * @param fs      filesystem object
403   * @param rootdir hbase root
404   * @throws IOException e
405   */
406  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
407    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
408      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
409  }
410
411  /**
412   * Sets version of file system
413   * @param fs      filesystem object
414   * @param rootdir hbase root
415   * @param wait    time to wait for retry
416   * @param retries number of times to retry before failing
417   * @throws IOException e
418   */
419  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
420    throws IOException {
421    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
422  }
423
424  /**
425   * Sets version of file system
426   * @param fs      filesystem object
427   * @param rootdir hbase root directory
428   * @param version version to set
429   * @param wait    time to wait for retry
430   * @param retries number of times to retry before throwing an IOException
431   * @throws IOException e
432   */
433  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
434    throws IOException {
435    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
436    Path tempVersionFile = new Path(rootdir,
437      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
438    while (true) {
439      try {
440        // Write the version to a temporary file
441        FSDataOutputStream s = fs.create(tempVersionFile);
442        try {
443          s.write(toVersionByteArray(version));
444          s.close();
445          s = null;
446          // Move the temp version file to its normal location. Returns false
447          // if the rename failed. Throw an IOE in that case.
448          if (!fs.rename(tempVersionFile, versionFile)) {
449            throw new IOException("Unable to move temp version file to " + versionFile);
450          }
451        } finally {
452          // Cleaning up the temporary if the rename failed would be trying
453          // too hard. We'll unconditionally create it again the next time
454          // through anyway, files are overwritten by default by create().
455
456          // Attempt to close the stream on the way out if it is still open.
457          try {
458            if (s != null) s.close();
459          } catch (IOException ignore) {
460          }
461        }
462        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
463        return;
464      } catch (IOException e) {
465        if (retries > 0) {
466          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
467          fs.delete(versionFile, false);
468          try {
469            if (wait > 0) {
470              Thread.sleep(wait);
471            }
472          } catch (InterruptedException ie) {
473            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
474          }
475          retries--;
476        } else {
477          throw e;
478        }
479      }
480    }
481  }
482
483  /**
484   * Checks that a cluster ID file exists in the HBase root directory
485   * @param fs      the root directory FileSystem
486   * @param rootdir the HBase root directory in HDFS
487   * @param wait    how long to wait between retries
488   * @return <code>true</code> if the file exists, otherwise <code>false</code>
489   * @throws IOException if checking the FileSystem fails
490   */
491  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
492    throws IOException {
493    while (true) {
494      try {
495        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
496        return fs.exists(filePath);
497      } catch (IOException ioe) {
498        if (wait > 0L) {
499          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
500          try {
501            Thread.sleep(wait);
502          } catch (InterruptedException e) {
503            Thread.currentThread().interrupt();
504            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
505          }
506        } else {
507          throw ioe;
508        }
509      }
510    }
511  }
512
513  /**
514   * Returns the value of the unique cluster ID stored for this HBase instance.
515   * @param fs      the root directory FileSystem
516   * @param rootdir the path to the HBase root directory
517   * @return the unique cluster identifier
518   * @throws IOException if reading the cluster ID file fails
519   */
520  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
521    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
522    ClusterId clusterId = null;
523    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
524    if (status != null) {
525      int len = Ints.checkedCast(status.getLen());
526      byte[] content = new byte[len];
527      FSDataInputStream in = fs.open(idPath);
528      try {
529        in.readFully(content);
530      } catch (EOFException eof) {
531        LOG.warn("Cluster ID file {} is empty", idPath);
532      } finally {
533        in.close();
534      }
535      try {
536        clusterId = ClusterId.parseFrom(content);
537      } catch (DeserializationException e) {
538        throw new IOException("content=" + Bytes.toString(content), e);
539      }
540      // If not pb'd, make it so.
541      if (!ProtobufUtil.isPBMagicPrefix(content)) {
542        String cid = null;
543        in = fs.open(idPath);
544        try {
545          cid = in.readUTF();
546          clusterId = new ClusterId(cid);
547        } catch (EOFException eof) {
548          LOG.warn("Cluster ID file {} is empty", idPath);
549        } finally {
550          in.close();
551        }
552        rewriteAsPb(fs, rootdir, idPath, clusterId);
553      }
554      return clusterId;
555    } else {
556      LOG.warn("Cluster ID file does not exist at {}", idPath);
557    }
558    return clusterId;
559  }
560
561  /**
562   *   */
563  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
564    final ClusterId cid) throws IOException {
565    // Rewrite the file as pb. Move aside the old one first, write new
566    // then delete the moved-aside file.
567    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
568    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
569    setClusterId(fs, rootdir, cid, 100);
570    if (!fs.delete(movedAsideName, false)) {
571      throw new IOException("Failed delete of " + movedAsideName);
572    }
573    LOG.debug("Rewrote the hbase.id file as pb");
574  }
575
576  /**
577   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
578   * directory
579   * @param fs        the root directory FileSystem
580   * @param rootdir   the path to the HBase root directory
581   * @param clusterId the unique identifier to store
582   * @param wait      how long (in milliseconds) to wait between retries
583   * @throws IOException if writing to the FileSystem fails and no wait value
584   */
585  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, int wait)
586    throws IOException {
587    while (true) {
588      try {
589        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
590        Path tempIdFile = new Path(rootdir,
591          HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
592        // Write the id file to a temporary location
593        FSDataOutputStream s = fs.create(tempIdFile);
594        try {
595          s.write(clusterId.toByteArray());
596          s.close();
597          s = null;
598          // Move the temporary file to its normal location. Throw an IOE if
599          // the rename failed
600          if (!fs.rename(tempIdFile, idFile)) {
601            throw new IOException("Unable to move temp version file to " + idFile);
602          }
603        } finally {
604          // Attempt to close the stream if still open on the way out
605          try {
606            if (s != null) s.close();
607          } catch (IOException ignore) {
608          }
609        }
610        if (LOG.isDebugEnabled()) {
611          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
612        }
613        return;
614      } catch (IOException ioe) {
615        if (wait > 0) {
616          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + ", retrying in "
617            + wait + "msec: " + StringUtils.stringifyException(ioe));
618          try {
619            Thread.sleep(wait);
620          } catch (InterruptedException e) {
621            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
622          }
623        } else {
624          throw ioe;
625        }
626      }
627    }
628  }
629
630  /**
631   * If DFS, check safe mode and if so, wait until we clear it.
632   * @param conf configuration
633   * @param wait Sleep between retries
634   * @throws IOException e
635   */
636  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
637    FileSystem fs = FileSystem.get(conf);
638    if (!(fs instanceof DistributedFileSystem)) return;
639    DistributedFileSystem dfs = (DistributedFileSystem) fs;
640    // Make sure dfs is not in safe mode
641    while (isInSafeMode(dfs)) {
642      LOG.info("Waiting for dfs to exit safe mode...");
643      try {
644        Thread.sleep(wait);
645      } catch (InterruptedException e) {
646        Thread.currentThread().interrupt();
647        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
648      }
649    }
650  }
651
652  /**
653   * Checks if meta region exists
654   * @param fs      file system
655   * @param rootDir root directory of HBase installation
656   * @return true if exists
657   */
658  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
659    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
660    return fs.exists(metaRegionDir);
661  }
662
663  /**
664   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
665   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
666   * method retrieves those blocks from the input stream and uses them to calculate
667   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
668   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
669   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
670   * themselves.
671   */
672  public static HDFSBlocksDistribution
673    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
674    List<LocatedBlock> blocks = inputStream.getAllBlocks();
675    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
676    for (LocatedBlock block : blocks) {
677      String[] hosts = getHostsForLocations(block);
678      long len = block.getBlockSize();
679      StorageType[] storageTypes = block.getStorageTypes();
680      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
681    }
682    return blocksDistribution;
683  }
684
685  private static String[] getHostsForLocations(LocatedBlock block) {
686    DatanodeInfo[] locations = getLocatedBlockLocations(block);
687    String[] hosts = new String[locations.length];
688    for (int i = 0; i < hosts.length; i++) {
689      hosts[i] = locations[i].getHostName();
690    }
691    return hosts;
692  }
693
694  /**
695   * Compute HDFS blocks distribution of a given file, or a portion of the file
696   * @param fs     file system
697   * @param status file status of the file
698   * @param start  start position of the portion
699   * @param length length of the portion
700   * @return The HDFS blocks distribution
701   */
702  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
703    FileStatus status, long start, long length) throws IOException {
704    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
705    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
706    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
707    return blocksDistribution;
708  }
709
710  /**
711   * Update blocksDistribution with blockLocations
712   * @param blocksDistribution the hdfs blocks distribution
713   * @param blockLocations     an array containing block location
714   */
715  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
716    BlockLocation[] blockLocations) throws IOException {
717    for (BlockLocation bl : blockLocations) {
718      String[] hosts = bl.getHosts();
719      long len = bl.getLength();
720      StorageType[] storageTypes = bl.getStorageTypes();
721      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
722    }
723  }
724
725  // TODO move this method OUT of FSUtils. No dependencies to HMaster
726  /**
727   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
728   * @param master The master defining the HBase root and file system
729   * @return A map for each table and its percentage (never null)
730   * @throws IOException When scanning the directory fails
731   */
732  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
733    Map<String, Integer> map = getTableFragmentation(master);
734    return map.isEmpty() ? -1 : map.get("-TOTAL-");
735  }
736
737  /**
738   * Runs through the HBase rootdir and checks how many stores for each table have more than one
739   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
740   * stored under the special key "-TOTAL-".
741   * @param master The master defining the HBase root and file system.
742   * @return A map for each table and its percentage (never null).
743   * @throws IOException When scanning the directory fails.
744   */
745  public static Map<String, Integer> getTableFragmentation(final HMaster master)
746    throws IOException {
747    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
748    // since HMaster.getFileSystem() is package private
749    FileSystem fs = path.getFileSystem(master.getConfiguration());
750    return getTableFragmentation(fs, path);
751  }
752
753  /**
754   * Runs through the HBase rootdir and checks how many stores for each table have more than one
755   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
756   * stored under the special key "-TOTAL-".
757   * @param fs           The file system to use
758   * @param hbaseRootDir The root directory to scan
759   * @return A map for each table and its percentage (never null)
760   * @throws IOException When scanning the directory fails
761   */
762  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
763    final Path hbaseRootDir) throws IOException {
764    Map<String, Integer> frags = new HashMap<>();
765    int cfCountTotal = 0;
766    int cfFragTotal = 0;
767    PathFilter regionFilter = new RegionDirFilter(fs);
768    PathFilter familyFilter = new FamilyDirFilter(fs);
769    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
770    for (Path d : tableDirs) {
771      int cfCount = 0;
772      int cfFrag = 0;
773      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
774      for (FileStatus regionDir : regionDirs) {
775        Path dd = regionDir.getPath();
776        // else its a region name, now look in region for families
777        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
778        for (FileStatus familyDir : familyDirs) {
779          cfCount++;
780          cfCountTotal++;
781          Path family = familyDir.getPath();
782          // now in family make sure only one file
783          FileStatus[] familyStatus = fs.listStatus(family);
784          if (familyStatus.length > 1) {
785            cfFrag++;
786            cfFragTotal++;
787          }
788        }
789      }
790      // compute percentage per table and store in result list
791      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
792        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
793    }
794    // set overall percentage for all tables
795    frags.put("-TOTAL-",
796      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
797    return frags;
798  }
799
800  /**
801   * A {@link PathFilter} that returns only regular files.
802   */
803  static class FileFilter extends AbstractFileStatusFilter {
804    private final FileSystem fs;
805
806    public FileFilter(final FileSystem fs) {
807      this.fs = fs;
808    }
809
810    @Override
811    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
812      try {
813        return isFile(fs, isDir, p);
814      } catch (IOException e) {
815        LOG.warn("Unable to verify if path={} is a regular file", p, e);
816        return false;
817      }
818    }
819  }
820
821  /**
822   * Directory filter that doesn't include any of the directories in the specified blacklist
823   */
824  public static class BlackListDirFilter extends AbstractFileStatusFilter {
825    private final FileSystem fs;
826    private List<String> blacklist;
827
828    /**
829     * Create a filter on the givem filesystem with the specified blacklist
830     * @param fs                     filesystem to filter
831     * @param directoryNameBlackList list of the names of the directories to filter. If
832     *                               <tt>null</tt>, all directories are returned
833     */
834    @SuppressWarnings("unchecked")
835    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
836      this.fs = fs;
837      blacklist = (List<String>) (directoryNameBlackList == null
838        ? Collections.emptyList()
839        : directoryNameBlackList);
840    }
841
842    @Override
843    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
844      if (!isValidName(p.getName())) {
845        return false;
846      }
847
848      try {
849        return isDirectory(fs, isDir, p);
850      } catch (IOException e) {
851        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
852          + " Returning 'not valid' and continuing.", p, e);
853        return false;
854      }
855    }
856
857    protected boolean isValidName(final String name) {
858      return !blacklist.contains(name);
859    }
860  }
861
862  /**
863   * A {@link PathFilter} that only allows directories.
864   */
865  public static class DirFilter extends BlackListDirFilter {
866
867    public DirFilter(FileSystem fs) {
868      super(fs, null);
869    }
870  }
871
872  /**
873   * A {@link PathFilter} that returns usertable directories. To get all directories use the
874   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
875   */
876  public static class UserTableDirFilter extends BlackListDirFilter {
877    public UserTableDirFilter(FileSystem fs) {
878      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
879    }
880
881    @Override
882    protected boolean isValidName(final String name) {
883      if (!super.isValidName(name)) return false;
884
885      try {
886        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
887      } catch (IllegalArgumentException e) {
888        LOG.info("Invalid table name: {}", name);
889        return false;
890      }
891      return true;
892    }
893  }
894
895  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
896    throws IOException {
897    List<Path> tableDirs = new ArrayList<>();
898    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
899    if (fs.exists(baseNamespaceDir)) {
900      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
901        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
902      }
903    }
904    return tableDirs;
905  }
906
907  /**
908   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
909   *         such as .logs, .oldlogs, .corrupt folders.
910   */
911  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
912    throws IOException {
913    // presumes any directory under hbase.rootdir is a table
914    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
915    List<Path> tabledirs = new ArrayList<>(dirs.length);
916    for (FileStatus dir : dirs) {
917      tabledirs.add(dir.getPath());
918    }
919    return tabledirs;
920  }
921
922  /**
923   * Filter for all dirs that don't start with '.'
924   */
925  public static class RegionDirFilter extends AbstractFileStatusFilter {
926    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
927    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
928    final FileSystem fs;
929
930    public RegionDirFilter(FileSystem fs) {
931      this.fs = fs;
932    }
933
934    @Override
935    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
936      if (!regionDirPattern.matcher(p.getName()).matches()) {
937        return false;
938      }
939
940      try {
941        return isDirectory(fs, isDir, p);
942      } catch (IOException ioe) {
943        // Maybe the file was moved or the fs was disconnected.
944        LOG.warn("Skipping file {} due to IOException", p, ioe);
945        return false;
946      }
947    }
948  }
949
950  /**
951   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
952   * .tableinfo
953   * @param fs       A file system for the Path
954   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
955   * @return List of paths to valid region directories in table dir.
956   */
957  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
958    throws IOException {
959    // assumes we are in a table dir.
960    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
961    if (rds == null) {
962      return Collections.emptyList();
963    }
964    List<Path> regionDirs = new ArrayList<>(rds.size());
965    for (FileStatus rdfs : rds) {
966      Path rdPath = rdfs.getPath();
967      regionDirs.add(rdPath);
968    }
969    return regionDirs;
970  }
971
972  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
973    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
974  }
975
976  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
977    return getRegionDirFromTableDir(tableDir,
978      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
979  }
980
981  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
982    return new Path(tableDir, encodedRegionName);
983  }
984
985  /**
986   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
987   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
988   */
989  public static class FamilyDirFilter extends AbstractFileStatusFilter {
990    final FileSystem fs;
991
992    public FamilyDirFilter(FileSystem fs) {
993      this.fs = fs;
994    }
995
996    @Override
997    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
998      try {
999        // throws IAE if invalid
1000        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1001      } catch (IllegalArgumentException iae) {
1002        // path name is an invalid family name and thus is excluded.
1003        return false;
1004      }
1005
1006      try {
1007        return isDirectory(fs, isDir, p);
1008      } catch (IOException ioe) {
1009        // Maybe the file was moved or the fs was disconnected.
1010        LOG.warn("Skipping file {} due to IOException", p, ioe);
1011        return false;
1012      }
1013    }
1014  }
1015
1016  /**
1017   * Given a particular region dir, return all the familydirs inside it
1018   * @param fs        A file system for the Path
1019   * @param regionDir Path to a specific region directory
1020   * @return List of paths to valid family directories in region dir.
1021   */
1022  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1023    throws IOException {
1024    // assumes we are in a region dir.
1025    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1026  }
1027
1028  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1029    throws IOException {
1030    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1031  }
1032
1033  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1034    throws IOException {
1035    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1036  }
1037
1038  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1039    final PathFilter pathFilter) throws IOException {
1040    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1041    List<Path> files = new ArrayList<>(fds.length);
1042    for (FileStatus fdfs : fds) {
1043      Path fdPath = fdfs.getPath();
1044      files.add(fdPath);
1045    }
1046    return files;
1047  }
1048
1049  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1050    int result = 0;
1051    try {
1052      for (Path familyDir : getFamilyDirs(fs, p)) {
1053        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1054      }
1055    } catch (IOException e) {
1056      LOG.warn("Error Counting reference files.", e);
1057    }
1058    return result;
1059  }
1060
1061  public static class ReferenceAndLinkFileFilter implements PathFilter {
1062
1063    private final FileSystem fs;
1064
1065    public ReferenceAndLinkFileFilter(FileSystem fs) {
1066      this.fs = fs;
1067    }
1068
1069    @Override
1070    public boolean accept(Path rd) {
1071      try {
1072        // only files can be references.
1073        return !fs.getFileStatus(rd).isDirectory()
1074          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1075      } catch (IOException ioe) {
1076        // Maybe the file was moved or the fs was disconnected.
1077        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1078        return false;
1079      }
1080    }
1081  }
1082
1083  /**
1084   * Filter for HFiles that excludes reference files.
1085   */
1086  public static class HFileFilter extends AbstractFileStatusFilter {
1087    final FileSystem fs;
1088
1089    public HFileFilter(FileSystem fs) {
1090      this.fs = fs;
1091    }
1092
1093    @Override
1094    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1095      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1096        return false;
1097      }
1098
1099      try {
1100        return isFile(fs, isDir, p);
1101      } catch (IOException ioe) {
1102        // Maybe the file was moved or the fs was disconnected.
1103        LOG.warn("Skipping file {} due to IOException", p, ioe);
1104        return false;
1105      }
1106    }
1107  }
1108
1109  /**
1110   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1111   * if a link is file or not.
1112   */
1113  public static class HFileLinkFilter implements PathFilter {
1114
1115    @Override
1116    public boolean accept(Path p) {
1117      return HFileLink.isHFileLink(p);
1118    }
1119  }
1120
1121  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1122
1123    private final FileSystem fs;
1124
1125    public ReferenceFileFilter(FileSystem fs) {
1126      this.fs = fs;
1127    }
1128
1129    @Override
1130    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1131      if (!StoreFileInfo.isReference(p)) {
1132        return false;
1133      }
1134
1135      try {
1136        // only files can be references.
1137        return isFile(fs, isDir, p);
1138      } catch (IOException ioe) {
1139        // Maybe the file was moved or the fs was disconnected.
1140        LOG.warn("Skipping file {} due to IOException", p, ioe);
1141        return false;
1142      }
1143    }
1144  }
1145
1146  /**
1147   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1148   */
1149  interface ProgressReporter {
1150    /**
1151     * @param status File or directory we are about to process.
1152     */
1153    void progress(FileStatus status);
1154  }
1155
1156  /**
1157   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1158   * names to the full Path. <br>
1159   * Example...<br>
1160   * Key = 3944417774205889744 <br>
1161   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1162   * @param map          map to add values. If null, this method will create and populate one to
1163   *                     return
1164   * @param fs           The file system to use.
1165   * @param hbaseRootDir The root directory to scan.
1166   * @param tableName    name of the table to scan.
1167   * @return Map keyed by StoreFile name with a value of the full Path.
1168   * @throws IOException When scanning the directory fails.
1169   */
1170  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1171    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1172    throws IOException, InterruptedException {
1173    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1174      (ProgressReporter) null);
1175  }
1176
1177  /**
1178   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1179   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1180   * that we will skip files that no longer exist by the time we traverse them and similarly the
1181   * user of the result needs to consider that some entries in this map may not exist by the time
1182   * this call completes. <br>
1183   * Example...<br>
1184   * Key = 3944417774205889744 <br>
1185   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1186   * @param resultMap        map to add values. If null, this method will create and populate one to
1187   *                         return
1188   * @param fs               The file system to use.
1189   * @param hbaseRootDir     The root directory to scan.
1190   * @param tableName        name of the table to scan.
1191   * @param sfFilter         optional path filter to apply to store files
1192   * @param executor         optional executor service to parallelize this operation
1193   * @param progressReporter Instance or null; gets called every time we move to new region of
1194   *                         family dir and for each store file.
1195   * @return Map keyed by StoreFile name with a value of the full Path.
1196   * @throws IOException When scanning the directory fails.
1197   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1198   */
1199  @Deprecated
1200  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1201    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1202    ExecutorService executor, final HbckErrorReporter progressReporter)
1203    throws IOException, InterruptedException {
1204    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1205      new ProgressReporter() {
1206        @Override
1207        public void progress(FileStatus status) {
1208          // status is not used in this implementation.
1209          progressReporter.progress();
1210        }
1211      });
1212  }
1213
1214  /**
1215   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1216   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1217   * that we will skip files that no longer exist by the time we traverse them and similarly the
1218   * user of the result needs to consider that some entries in this map may not exist by the time
1219   * this call completes. <br>
1220   * Example...<br>
1221   * Key = 3944417774205889744 <br>
1222   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1223   * @param resultMap        map to add values. If null, this method will create and populate one to
1224   *                         return
1225   * @param fs               The file system to use.
1226   * @param hbaseRootDir     The root directory to scan.
1227   * @param tableName        name of the table to scan.
1228   * @param sfFilter         optional path filter to apply to store files
1229   * @param executor         optional executor service to parallelize this operation
1230   * @param progressReporter Instance or null; gets called every time we move to new region of
1231   *                         family dir and for each store file.
1232   * @return Map keyed by StoreFile name with a value of the full Path.
1233   * @throws IOException          When scanning the directory fails.
1234   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1235   */
1236  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1237    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1238    ExecutorService executor, final ProgressReporter progressReporter)
1239    throws IOException, InterruptedException {
1240
1241    final Map<String, Path> finalResultMap =
1242      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1243
1244    // only include the directory paths to tables
1245    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1246    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1247    // should be regions.
1248    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1249    final Vector<Exception> exceptions = new Vector<>();
1250
1251    try {
1252      List<FileStatus> regionDirs =
1253        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1254      if (regionDirs == null) {
1255        return finalResultMap;
1256      }
1257
1258      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1259
1260      for (FileStatus regionDir : regionDirs) {
1261        if (null != progressReporter) {
1262          progressReporter.progress(regionDir);
1263        }
1264        final Path dd = regionDir.getPath();
1265
1266        if (!exceptions.isEmpty()) {
1267          break;
1268        }
1269
1270        Runnable getRegionStoreFileMapCall = new Runnable() {
1271          @Override
1272          public void run() {
1273            try {
1274              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1275              List<FileStatus> familyDirs =
1276                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1277              if (familyDirs == null) {
1278                if (!fs.exists(dd)) {
1279                  LOG.warn("Skipping region because it no longer exists: " + dd);
1280                } else {
1281                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1282                }
1283                return;
1284              }
1285              for (FileStatus familyDir : familyDirs) {
1286                if (null != progressReporter) {
1287                  progressReporter.progress(familyDir);
1288                }
1289                Path family = familyDir.getPath();
1290                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1291                  continue;
1292                }
1293                // now in family, iterate over the StoreFiles and
1294                // put in map
1295                FileStatus[] familyStatus = fs.listStatus(family);
1296                for (FileStatus sfStatus : familyStatus) {
1297                  if (null != progressReporter) {
1298                    progressReporter.progress(sfStatus);
1299                  }
1300                  Path sf = sfStatus.getPath();
1301                  if (sfFilter == null || sfFilter.accept(sf)) {
1302                    regionStoreFileMap.put(sf.getName(), sf);
1303                  }
1304                }
1305              }
1306              finalResultMap.putAll(regionStoreFileMap);
1307            } catch (Exception e) {
1308              LOG.error("Could not get region store file map for region: " + dd, e);
1309              exceptions.add(e);
1310            }
1311          }
1312        };
1313
1314        // If executor is available, submit async tasks to exec concurrently, otherwise
1315        // just do serial sync execution
1316        if (executor != null) {
1317          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1318          futures.add(future);
1319        } else {
1320          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1321          future.run();
1322          futures.add(future);
1323        }
1324      }
1325
1326      // Ensure all pending tasks are complete (or that we run into an exception)
1327      for (Future<?> f : futures) {
1328        if (!exceptions.isEmpty()) {
1329          break;
1330        }
1331        try {
1332          f.get();
1333        } catch (ExecutionException e) {
1334          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1335          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1336        }
1337      }
1338    } catch (IOException e) {
1339      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1340      exceptions.add(e);
1341    } finally {
1342      if (!exceptions.isEmpty()) {
1343        // Just throw the first exception as an indication something bad happened
1344        // Don't need to propagate all the exceptions, we already logged them all anyway
1345        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1346        throw new IOException(exceptions.firstElement());
1347      }
1348    }
1349
1350    return finalResultMap;
1351  }
1352
1353  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1354    int result = 0;
1355    try {
1356      for (Path familyDir : getFamilyDirs(fs, p)) {
1357        result += getReferenceFilePaths(fs, familyDir).size();
1358      }
1359    } catch (IOException e) {
1360      LOG.warn("Error counting reference files", e);
1361    }
1362    return result;
1363  }
1364
1365  /**
1366   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1367   * the full Path. <br>
1368   * Example...<br>
1369   * Key = 3944417774205889744 <br>
1370   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1371   * @param fs           The file system to use.
1372   * @param hbaseRootDir The root directory to scan.
1373   * @return Map keyed by StoreFile name with a value of the full Path.
1374   * @throws IOException When scanning the directory fails.
1375   */
1376  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1377    final Path hbaseRootDir) throws IOException, InterruptedException {
1378    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1379  }
1380
1381  /**
1382   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1383   * the full Path. <br>
1384   * Example...<br>
1385   * Key = 3944417774205889744 <br>
1386   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1387   * @param fs               The file system to use.
1388   * @param hbaseRootDir     The root directory to scan.
1389   * @param sfFilter         optional path filter to apply to store files
1390   * @param executor         optional executor service to parallelize this operation
1391   * @param progressReporter Instance or null; gets called every time we move to new region of
1392   *                         family dir and for each store file.
1393   * @return Map keyed by StoreFile name with a value of the full Path.
1394   * @throws IOException When scanning the directory fails.
1395   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1396   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1397   */
1398  @Deprecated
1399  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1400    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1401    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1402    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1403      @Override
1404      public void progress(FileStatus status) {
1405        // status is not used in this implementation.
1406        progressReporter.progress();
1407      }
1408    });
1409  }
1410
1411  /**
1412   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1413   * the full Path. <br>
1414   * Example...<br>
1415   * Key = 3944417774205889744 <br>
1416   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1417   * @param fs               The file system to use.
1418   * @param hbaseRootDir     The root directory to scan.
1419   * @param sfFilter         optional path filter to apply to store files
1420   * @param executor         optional executor service to parallelize this operation
1421   * @param progressReporter Instance or null; gets called every time we move to new region of
1422   *                         family dir and for each store file.
1423   * @return Map keyed by StoreFile name with a value of the full Path.
1424   * @throws IOException When scanning the directory fails.
1425   */
1426  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1427    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1428    ProgressReporter progressReporter) throws IOException, InterruptedException {
1429    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1430
1431    // if this method looks similar to 'getTableFragmentation' that is because
1432    // it was borrowed from it.
1433
1434    // only include the directory paths to tables
1435    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1436      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1437        sfFilter, executor, progressReporter);
1438    }
1439    return map;
1440  }
1441
1442  /**
1443   * Filters FileStatuses in an array and returns a list
1444   * @param input  An array of FileStatuses
1445   * @param filter A required filter to filter the array
1446   * @return A list of FileStatuses
1447   */
1448  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1449    if (input == null) return null;
1450    return filterFileStatuses(Iterators.forArray(input), filter);
1451  }
1452
1453  /**
1454   * Filters FileStatuses in an iterator and returns a list
1455   * @param input  An iterator of FileStatuses
1456   * @param filter A required filter to filter the array
1457   * @return A list of FileStatuses
1458   */
1459  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1460    FileStatusFilter filter) {
1461    if (input == null) return null;
1462    ArrayList<FileStatus> results = new ArrayList<>();
1463    while (input.hasNext()) {
1464      FileStatus f = input.next();
1465      if (filter.accept(f)) {
1466        results.add(f);
1467      }
1468    }
1469    return results;
1470  }
1471
1472  /**
1473   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1474   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1475   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1476   * @param fs     file system
1477   * @param dir    directory
1478   * @param filter file status filter
1479   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1480   */
1481  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1482    final FileStatusFilter filter) throws IOException {
1483    FileStatus[] status = null;
1484    try {
1485      status = fs.listStatus(dir);
1486    } catch (FileNotFoundException fnfe) {
1487      LOG.trace("{} does not exist", dir);
1488      return null;
1489    }
1490
1491    if (ArrayUtils.getLength(status) == 0) {
1492      return null;
1493    }
1494
1495    if (filter == null) {
1496      return Arrays.asList(status);
1497    } else {
1498      List<FileStatus> status2 = filterFileStatuses(status, filter);
1499      if (status2 == null || status2.isEmpty()) {
1500        return null;
1501      } else {
1502        return status2;
1503      }
1504    }
1505  }
1506
1507  /**
1508   * This function is to scan the root path of the file system to get the degree of locality for
1509   * each region on each of the servers having at least one block of that region. This is used by
1510   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1511   * use
1512   * @return the mapping from region encoded name to a map of server names to locality fraction in
1513   *         case of file system errors or interrupts
1514   */
1515  public static Map<String, Map<String, Float>>
1516    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1517    return getRegionDegreeLocalityMappingFromFS(conf, null,
1518      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1519
1520  }
1521
1522  /**
1523   * This function is to scan the root path of the file system to get the degree of locality for
1524   * each region on each of the servers having at least one block of that region. the configuration
1525   * to use the table you wish to scan locality for the thread pool size to use
1526   * @return the mapping from region encoded name to a map of server names to locality fraction in
1527   *         case of file system errors or interrupts
1528   */
1529  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1530    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1531    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1532    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1533    return regionDegreeLocalityMapping;
1534  }
1535
1536  /**
1537   * This function is to scan the root path of the file system to get either the mapping between the
1538   * region name and its best locality region server or the degree of locality of each region on
1539   * each of the servers having at least one block of that region. The output map parameters are
1540   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1541   * size to use the map into which to put the locality degree mapping or null, must be a
1542   * thread-safe implementation in case of file system errors or interrupts
1543   */
1544  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1545    final String desiredTable, int threadPoolSize,
1546    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1547    final FileSystem fs = FileSystem.get(conf);
1548    final Path rootPath = CommonFSUtils.getRootDir(conf);
1549    final long startTime = EnvironmentEdgeManager.currentTime();
1550    final Path queryPath;
1551    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1552    if (null == desiredTable) {
1553      queryPath =
1554        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1555    } else {
1556      queryPath = new Path(
1557        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1558    }
1559
1560    // reject all paths that are not appropriate
1561    PathFilter pathFilter = new PathFilter() {
1562      @Override
1563      public boolean accept(Path path) {
1564        // this is the region name; it may get some noise data
1565        if (null == path) {
1566          return false;
1567        }
1568
1569        // no parent?
1570        Path parent = path.getParent();
1571        if (null == parent) {
1572          return false;
1573        }
1574
1575        String regionName = path.getName();
1576        if (null == regionName) {
1577          return false;
1578        }
1579
1580        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1581          return false;
1582        }
1583        return true;
1584      }
1585    };
1586
1587    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1588
1589    if (LOG.isDebugEnabled()) {
1590      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1591    }
1592
1593    if (null == statusList) {
1594      return;
1595    }
1596
1597    // lower the number of threads in case we have very few expected regions
1598    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1599
1600    // run in multiple threads
1601    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1602      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1603        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1604    try {
1605      // ignore all file status items that are not of interest
1606      for (FileStatus regionStatus : statusList) {
1607        if (null == regionStatus || !regionStatus.isDirectory()) {
1608          continue;
1609        }
1610
1611        final Path regionPath = regionStatus.getPath();
1612        if (null != regionPath) {
1613          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1614        }
1615      }
1616    } finally {
1617      tpe.shutdown();
1618      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1619        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1620      try {
1621        // here we wait until TPE terminates, which is either naturally or by
1622        // exceptions in the execution of the threads
1623        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1624          // printing out rough estimate, so as to not introduce
1625          // AtomicInteger
1626          LOG.info("Locality checking is underway: { Scanned Regions : "
1627            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1628            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1629        }
1630      } catch (InterruptedException e) {
1631        Thread.currentThread().interrupt();
1632        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1633      }
1634    }
1635
1636    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1637    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1638  }
1639
1640  /**
1641   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1642   * hbase or hdfs.
1643   */
1644  public static void setupShortCircuitRead(final Configuration conf) {
1645    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1646    boolean shortCircuitSkipChecksum =
1647      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1648    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1649    if (shortCircuitSkipChecksum) {
1650      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1651        + "be set to true."
1652        + (useHBaseChecksum
1653          ? " HBase checksum doesn't require "
1654            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1655          : ""));
1656      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1657    }
1658    checkShortCircuitReadBufferSize(conf);
1659  }
1660
1661  /**
1662   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1663   */
1664  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1665    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1666    final int notSet = -1;
1667    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1668    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1669    int size = conf.getInt(dfsKey, notSet);
1670    // If a size is set, return -- we will use it.
1671    if (size != notSet) return;
1672    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1673    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1674    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1675  }
1676
1677  /**
1678   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1679   */
1680  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1681    throws IOException {
1682    if (!CommonFSUtils.isHDFS(c)) {
1683      return null;
1684    }
1685    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1686    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1687    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1688    final String name = "getHedgedReadMetrics";
1689    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1690    Method m;
1691    try {
1692      m = dfsclient.getClass().getDeclaredMethod(name);
1693    } catch (NoSuchMethodException e) {
1694      LOG.warn(
1695        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1696      return null;
1697    } catch (SecurityException e) {
1698      LOG.warn(
1699        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1700      return null;
1701    }
1702    m.setAccessible(true);
1703    try {
1704      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1705    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1706      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1707        + e.getMessage());
1708      return null;
1709    }
1710  }
1711
1712  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1713    Configuration conf, int threads) throws IOException {
1714    ExecutorService pool = Executors.newFixedThreadPool(threads);
1715    List<Future<Void>> futures = new ArrayList<>();
1716    List<Path> traversedPaths;
1717    try {
1718      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1719      for (Future<Void> future : futures) {
1720        future.get();
1721      }
1722    } catch (ExecutionException | InterruptedException | IOException e) {
1723      throw new IOException("Copy snapshot reference files failed", e);
1724    } finally {
1725      pool.shutdownNow();
1726    }
1727    return traversedPaths;
1728  }
1729
1730  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1731    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1732    List<Path> traversedPaths = new ArrayList<>();
1733    traversedPaths.add(dst);
1734    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1735    if (currentFileStatus.isDirectory()) {
1736      if (!dstFS.mkdirs(dst)) {
1737        throw new IOException("Create directory failed: " + dst);
1738      }
1739      FileStatus[] subPaths = srcFS.listStatus(src);
1740      for (FileStatus subPath : subPaths) {
1741        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1742          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1743      }
1744    } else {
1745      Future<Void> future = pool.submit(() -> {
1746        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1747        return null;
1748      });
1749      futures.add(future);
1750    }
1751    return traversedPaths;
1752  }
1753
1754  /** Returns A set containing all namenode addresses of fs */
1755  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1756    Configuration conf) {
1757    Set<InetSocketAddress> addresses = new HashSet<>();
1758    String serviceName = fs.getCanonicalServiceName();
1759
1760    if (serviceName.startsWith("ha-hdfs")) {
1761      try {
1762        Map<String, Map<String, InetSocketAddress>> addressMap =
1763          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1764        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1765        if (addressMap.containsKey(nameService)) {
1766          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1767          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1768            InetSocketAddress addr = e2.getValue();
1769            addresses.add(addr);
1770          }
1771        }
1772      } catch (Exception e) {
1773        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1774      }
1775    } else {
1776      URI uri = fs.getUri();
1777      int port = uri.getPort();
1778      if (port < 0) {
1779        int idx = serviceName.indexOf(':');
1780        port = Integer.parseInt(serviceName.substring(idx + 1));
1781      }
1782      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1783      addresses.add(addr);
1784    }
1785
1786    return addresses;
1787  }
1788
1789  /**
1790   * @param conf the Configuration of HBase
1791   * @return Whether srcFs and desFs are on same hdfs or not
1792   */
1793  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1794    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1795    // show a unified format which contains scheme, host and port.
1796    String srcServiceName = srcFs.getCanonicalServiceName();
1797    String desServiceName = desFs.getCanonicalServiceName();
1798
1799    if (srcServiceName == null || desServiceName == null) {
1800      return false;
1801    }
1802    if (srcServiceName.equals(desServiceName)) {
1803      return true;
1804    }
1805    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1806      Collection<String> internalNameServices =
1807        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1808      if (!internalNameServices.isEmpty()) {
1809        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1810          return true;
1811        } else {
1812          return false;
1813        }
1814      }
1815    }
1816    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1817      // If one serviceName is an HA format while the other is a non-HA format,
1818      // maybe they refer to the same FileSystem.
1819      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1820      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1821      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1822      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1823        return true;
1824      }
1825    }
1826
1827    return false;
1828  }
1829}