001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 022 023import edu.umd.cs.findbugs.annotations.CheckForNull; 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.EOFException; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.net.InetSocketAddress; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Locale; 043import java.util.Map; 044import java.util.Set; 045import java.util.Vector; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.Future; 051import java.util.concurrent.FutureTask; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.TimeUnit; 054import java.util.regex.Pattern; 055import org.apache.commons.lang3.ArrayUtils; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.BlockLocation; 058import org.apache.hadoop.fs.FSDataInputStream; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileStatus; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.FileUtil; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.fs.PathFilter; 065import org.apache.hadoop.fs.StorageType; 066import org.apache.hadoop.fs.permission.FsPermission; 067import org.apache.hadoop.hbase.ClusterId; 068import org.apache.hadoop.hbase.HColumnDescriptor; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.HDFSBlocksDistribution; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.exceptions.DeserializationException; 076import org.apache.hadoop.hbase.fs.HFileSystem; 077import org.apache.hadoop.hbase.io.HFileLink; 078import org.apache.hadoop.hbase.master.HMaster; 079import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 080import org.apache.hadoop.hdfs.DFSClient; 081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 082import org.apache.hadoop.hdfs.DFSUtil; 083import org.apache.hadoop.hdfs.DistributedFileSystem; 084import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 085import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 086import org.apache.hadoop.hdfs.protocol.LocatedBlock; 087import org.apache.hadoop.io.IOUtils; 088import org.apache.hadoop.ipc.RemoteException; 089import org.apache.hadoop.util.StringUtils; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 096import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 102 103/** 104 * Utility methods for interacting with the underlying file system. 105 */ 106@InterfaceAudience.Private 107public final class FSUtils { 108 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 109 110 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 111 private static final int DEFAULT_THREAD_POOLSIZE = 2; 112 113 /** Set to true on Windows platforms */ 114 // currently only used in testing. TODO refactor into a test class 115 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 116 117 private FSUtils() { 118 } 119 120 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 121 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 122 FileSystem fileSystem = fs; 123 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 124 // Check its backing fs for dfs-ness. 125 if (fs instanceof HFileSystem) { 126 fileSystem = ((HFileSystem) fs).getBackingFs(); 127 } 128 return fileSystem instanceof DistributedFileSystem; 129 } 130 131 /** 132 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 133 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 134 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 135 * @param pathToSearch Path we will be trying to match. 136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 Path tailPath = pathTail; 140 String tailName; 141 Path toSearch = pathToSearch; 142 String toSearchName; 143 boolean result = false; 144 145 if (pathToSearch.depth() != pathTail.depth()) { 146 return false; 147 } 148 149 do { 150 tailName = tailPath.getName(); 151 if (tailName == null || tailName.isEmpty()) { 152 result = true; 153 break; 154 } 155 toSearchName = toSearch.getName(); 156 if (toSearchName == null || toSearchName.isEmpty()) { 157 break; 158 } 159 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 160 tailPath = tailPath.getParent(); 161 toSearch = toSearch.getParent(); 162 } while (tailName.equals(toSearchName)); 163 return result; 164 } 165 166 /** 167 * Delete the region directory if exists. 168 * @return True if deleted the region directory. 169 */ 170 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 171 throws IOException { 172 Path rootDir = CommonFSUtils.getRootDir(conf); 173 FileSystem fs = rootDir.getFileSystem(conf); 174 return CommonFSUtils.deleteDirectory(fs, 175 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 176 } 177 178 /** 179 * Create the specified file on the filesystem. By default, this will: 180 * <ol> 181 * <li>overwrite the file if it exists</li> 182 * <li>apply the umask in the configuration (if it is enabled)</li> 183 * <li>use the fs configured buffer size (or 4096 if not set)</li> 184 * <li>use the configured column family replication or default replication if 185 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 186 * <li>use the default block size</li> 187 * <li>not track progress</li> 188 * </ol> 189 * @param conf configurations 190 * @param fs {@link FileSystem} on which to write the file 191 * @param path {@link Path} to the file to write 192 * @param perm permissions 193 * @param favoredNodes favored data nodes 194 * @return output stream to the created file 195 * @throws IOException if the file cannot be created 196 */ 197 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 198 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 199 if (fs instanceof HFileSystem) { 200 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 201 if (backingFs instanceof DistributedFileSystem) { 202 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 203 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 204 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 205 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 206 .create(); 207 if (favoredNodes != null) { 208 builder.favoredNodes(favoredNodes); 209 } 210 if (replication > 0) { 211 builder.replication(replication); 212 } 213 return builder.build(); 214 } 215 216 } 217 return CommonFSUtils.create(fs, path, perm, true); 218 } 219 220 /** 221 * Checks to see if the specified file system is available 222 * @param fs filesystem 223 * @throws IOException e 224 */ 225 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 226 if (!(fs instanceof DistributedFileSystem)) { 227 return; 228 } 229 IOException exception = null; 230 DistributedFileSystem dfs = (DistributedFileSystem) fs; 231 try { 232 if (dfs.exists(new Path("/"))) { 233 return; 234 } 235 } catch (IOException e) { 236 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 237 } 238 try { 239 fs.close(); 240 } catch (Exception e) { 241 LOG.error("file system close failed: ", e); 242 } 243 throw new IOException("File system is not available", exception); 244 } 245 246 /** 247 * Inquire the Active NameNode's safe mode status. 248 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 249 * @return whether we're in safe mode 250 */ 251 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 252 return dfs.setSafeMode(SAFEMODE_GET, true); 253 } 254 255 /** 256 * Check whether dfs is in safemode. 257 */ 258 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 259 boolean isInSafeMode = false; 260 FileSystem fs = FileSystem.get(conf); 261 if (fs instanceof DistributedFileSystem) { 262 DistributedFileSystem dfs = (DistributedFileSystem) fs; 263 isInSafeMode = isInSafeMode(dfs); 264 } 265 if (isInSafeMode) { 266 throw new IOException("File system is in safemode, it can't be written now"); 267 } 268 } 269 270 /** 271 * Verifies current version of file system 272 * @param fs filesystem object 273 * @param rootdir root hbase directory 274 * @return null if no version file exists, version string otherwise 275 * @throws IOException if the version file fails to open 276 * @throws DeserializationException if the version data cannot be translated into a version 277 */ 278 public static String getVersion(FileSystem fs, Path rootdir) 279 throws IOException, DeserializationException { 280 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 281 FileStatus[] status = null; 282 try { 283 // hadoop 2.0 throws FNFE if directory does not exist. 284 // hadoop 1.0 returns null if directory does not exist. 285 status = fs.listStatus(versionFile); 286 } catch (FileNotFoundException fnfe) { 287 return null; 288 } 289 if (ArrayUtils.getLength(status) == 0) { 290 return null; 291 } 292 String version = null; 293 byte[] content = new byte[(int) status[0].getLen()]; 294 FSDataInputStream s = fs.open(versionFile); 295 try { 296 IOUtils.readFully(s, content, 0, content.length); 297 if (ProtobufUtil.isPBMagicPrefix(content)) { 298 version = parseVersionFrom(content); 299 } else { 300 // Presume it pre-pb format. 301 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 302 version = dis.readUTF(); 303 } 304 } 305 } catch (EOFException eof) { 306 LOG.warn("Version file was empty, odd, will try to set it."); 307 } finally { 308 s.close(); 309 } 310 return version; 311 } 312 313 /** 314 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 315 * @param bytes The byte content of the hbase.version file 316 * @return The version found in the file as a String 317 * @throws DeserializationException if the version data cannot be translated into a version 318 */ 319 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 320 ProtobufUtil.expectPBMagicPrefix(bytes); 321 int pblen = ProtobufUtil.lengthOfPBMagic(); 322 FSProtos.HBaseVersionFileContent.Builder builder = 323 FSProtos.HBaseVersionFileContent.newBuilder(); 324 try { 325 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 326 return builder.getVersion(); 327 } catch (IOException e) { 328 // Convert 329 throw new DeserializationException(e); 330 } 331 } 332 333 /** 334 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 335 * @param version Version to persist 336 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 337 * prefix. 338 */ 339 static byte[] toVersionByteArray(final String version) { 340 FSProtos.HBaseVersionFileContent.Builder builder = 341 FSProtos.HBaseVersionFileContent.newBuilder(); 342 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 343 } 344 345 /** 346 * Verifies current version of file system 347 * @param fs file system 348 * @param rootdir root directory of HBase installation 349 * @param message if true, issues a message on System.out 350 * @throws IOException if the version file cannot be opened 351 * @throws DeserializationException if the contents of the version file cannot be parsed 352 */ 353 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 354 throws IOException, DeserializationException { 355 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 356 } 357 358 /** 359 * Verifies current version of file system 360 * @param fs file system 361 * @param rootdir root directory of HBase installation 362 * @param message if true, issues a message on System.out 363 * @param wait wait interval 364 * @param retries number of times to retry 365 * @throws IOException if the version file cannot be opened 366 * @throws DeserializationException if the contents of the version file cannot be parsed 367 */ 368 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 369 int retries) throws IOException, DeserializationException { 370 String version = getVersion(fs, rootdir); 371 String msg; 372 if (version == null) { 373 if (!metaRegionExists(fs, rootdir)) { 374 // rootDir is empty (no version file and no root region) 375 // just create new version file (HBASE-1195) 376 setVersion(fs, rootdir, wait, retries); 377 return; 378 } else { 379 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 380 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 381 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 382 } 383 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 384 return; 385 } else { 386 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 387 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 388 + ". Consult http://hbase.apache.org/book.html for further information about " 389 + "upgrading HBase."; 390 } 391 392 // version is deprecated require migration 393 // Output on stdout so user sees it in terminal. 394 if (message) { 395 System.out.println("WARNING! " + msg); 396 } 397 throw new FileSystemVersionException(msg); 398 } 399 400 /** 401 * Sets version of file system 402 * @param fs filesystem object 403 * @param rootdir hbase root 404 * @throws IOException e 405 */ 406 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 407 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 408 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 409 } 410 411 /** 412 * Sets version of file system 413 * @param fs filesystem object 414 * @param rootdir hbase root 415 * @param wait time to wait for retry 416 * @param retries number of times to retry before failing 417 * @throws IOException e 418 */ 419 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 420 throws IOException { 421 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 422 } 423 424 /** 425 * Sets version of file system 426 * @param fs filesystem object 427 * @param rootdir hbase root directory 428 * @param version version to set 429 * @param wait time to wait for retry 430 * @param retries number of times to retry before throwing an IOException 431 * @throws IOException e 432 */ 433 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 434 throws IOException { 435 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 436 Path tempVersionFile = new Path(rootdir, 437 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 438 while (true) { 439 try { 440 // Write the version to a temporary file 441 FSDataOutputStream s = fs.create(tempVersionFile); 442 try { 443 s.write(toVersionByteArray(version)); 444 s.close(); 445 s = null; 446 // Move the temp version file to its normal location. Returns false 447 // if the rename failed. Throw an IOE in that case. 448 if (!fs.rename(tempVersionFile, versionFile)) { 449 throw new IOException("Unable to move temp version file to " + versionFile); 450 } 451 } finally { 452 // Cleaning up the temporary if the rename failed would be trying 453 // too hard. We'll unconditionally create it again the next time 454 // through anyway, files are overwritten by default by create(). 455 456 // Attempt to close the stream on the way out if it is still open. 457 try { 458 if (s != null) s.close(); 459 } catch (IOException ignore) { 460 } 461 } 462 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 463 return; 464 } catch (IOException e) { 465 if (retries > 0) { 466 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 467 fs.delete(versionFile, false); 468 try { 469 if (wait > 0) { 470 Thread.sleep(wait); 471 } 472 } catch (InterruptedException ie) { 473 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 474 } 475 retries--; 476 } else { 477 throw e; 478 } 479 } 480 } 481 } 482 483 /** 484 * Checks that a cluster ID file exists in the HBase root directory 485 * @param fs the root directory FileSystem 486 * @param rootdir the HBase root directory in HDFS 487 * @param wait how long to wait between retries 488 * @return <code>true</code> if the file exists, otherwise <code>false</code> 489 * @throws IOException if checking the FileSystem fails 490 */ 491 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 492 throws IOException { 493 while (true) { 494 try { 495 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 496 return fs.exists(filePath); 497 } catch (IOException ioe) { 498 if (wait > 0L) { 499 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 500 try { 501 Thread.sleep(wait); 502 } catch (InterruptedException e) { 503 Thread.currentThread().interrupt(); 504 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 505 } 506 } else { 507 throw ioe; 508 } 509 } 510 } 511 } 512 513 /** 514 * Returns the value of the unique cluster ID stored for this HBase instance. 515 * @param fs the root directory FileSystem 516 * @param rootdir the path to the HBase root directory 517 * @return the unique cluster identifier 518 * @throws IOException if reading the cluster ID file fails 519 */ 520 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 521 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 522 ClusterId clusterId = null; 523 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 524 if (status != null) { 525 int len = Ints.checkedCast(status.getLen()); 526 byte[] content = new byte[len]; 527 FSDataInputStream in = fs.open(idPath); 528 try { 529 in.readFully(content); 530 } catch (EOFException eof) { 531 LOG.warn("Cluster ID file {} is empty", idPath); 532 } finally { 533 in.close(); 534 } 535 try { 536 clusterId = ClusterId.parseFrom(content); 537 } catch (DeserializationException e) { 538 throw new IOException("content=" + Bytes.toString(content), e); 539 } 540 // If not pb'd, make it so. 541 if (!ProtobufUtil.isPBMagicPrefix(content)) { 542 String cid = null; 543 in = fs.open(idPath); 544 try { 545 cid = in.readUTF(); 546 clusterId = new ClusterId(cid); 547 } catch (EOFException eof) { 548 LOG.warn("Cluster ID file {} is empty", idPath); 549 } finally { 550 in.close(); 551 } 552 rewriteAsPb(fs, rootdir, idPath, clusterId); 553 } 554 return clusterId; 555 } else { 556 LOG.warn("Cluster ID file does not exist at {}", idPath); 557 } 558 return clusterId; 559 } 560 561 /** 562 * */ 563 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 564 final ClusterId cid) throws IOException { 565 // Rewrite the file as pb. Move aside the old one first, write new 566 // then delete the moved-aside file. 567 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 568 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 569 setClusterId(fs, rootdir, cid, 100); 570 if (!fs.delete(movedAsideName, false)) { 571 throw new IOException("Failed delete of " + movedAsideName); 572 } 573 LOG.debug("Rewrote the hbase.id file as pb"); 574 } 575 576 /** 577 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 578 * directory 579 * @param fs the root directory FileSystem 580 * @param rootdir the path to the HBase root directory 581 * @param clusterId the unique identifier to store 582 * @param wait how long (in milliseconds) to wait between retries 583 * @throws IOException if writing to the FileSystem fails and no wait value 584 */ 585 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, int wait) 586 throws IOException { 587 while (true) { 588 try { 589 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 590 Path tempIdFile = new Path(rootdir, 591 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 592 // Write the id file to a temporary location 593 FSDataOutputStream s = fs.create(tempIdFile); 594 try { 595 s.write(clusterId.toByteArray()); 596 s.close(); 597 s = null; 598 // Move the temporary file to its normal location. Throw an IOE if 599 // the rename failed 600 if (!fs.rename(tempIdFile, idFile)) { 601 throw new IOException("Unable to move temp version file to " + idFile); 602 } 603 } finally { 604 // Attempt to close the stream if still open on the way out 605 try { 606 if (s != null) s.close(); 607 } catch (IOException ignore) { 608 } 609 } 610 if (LOG.isDebugEnabled()) { 611 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 612 } 613 return; 614 } catch (IOException ioe) { 615 if (wait > 0) { 616 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + ", retrying in " 617 + wait + "msec: " + StringUtils.stringifyException(ioe)); 618 try { 619 Thread.sleep(wait); 620 } catch (InterruptedException e) { 621 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 622 } 623 } else { 624 throw ioe; 625 } 626 } 627 } 628 } 629 630 /** 631 * If DFS, check safe mode and if so, wait until we clear it. 632 * @param conf configuration 633 * @param wait Sleep between retries 634 * @throws IOException e 635 */ 636 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 637 FileSystem fs = FileSystem.get(conf); 638 if (!(fs instanceof DistributedFileSystem)) return; 639 DistributedFileSystem dfs = (DistributedFileSystem) fs; 640 // Make sure dfs is not in safe mode 641 while (isInSafeMode(dfs)) { 642 LOG.info("Waiting for dfs to exit safe mode..."); 643 try { 644 Thread.sleep(wait); 645 } catch (InterruptedException e) { 646 Thread.currentThread().interrupt(); 647 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 648 } 649 } 650 } 651 652 /** 653 * Checks if meta region exists 654 * @param fs file system 655 * @param rootDir root directory of HBase installation 656 * @return true if exists 657 */ 658 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 659 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 660 return fs.exists(metaRegionDir); 661 } 662 663 /** 664 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 665 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 666 * method retrieves those blocks from the input stream and uses them to calculate 667 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 668 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 669 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 670 * themselves. 671 */ 672 public static HDFSBlocksDistribution 673 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 674 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 675 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 676 for (LocatedBlock block : blocks) { 677 String[] hosts = getHostsForLocations(block); 678 long len = block.getBlockSize(); 679 StorageType[] storageTypes = block.getStorageTypes(); 680 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 681 } 682 return blocksDistribution; 683 } 684 685 private static String[] getHostsForLocations(LocatedBlock block) { 686 DatanodeInfo[] locations = getLocatedBlockLocations(block); 687 String[] hosts = new String[locations.length]; 688 for (int i = 0; i < hosts.length; i++) { 689 hosts[i] = locations[i].getHostName(); 690 } 691 return hosts; 692 } 693 694 /** 695 * Compute HDFS blocks distribution of a given file, or a portion of the file 696 * @param fs file system 697 * @param status file status of the file 698 * @param start start position of the portion 699 * @param length length of the portion 700 * @return The HDFS blocks distribution 701 */ 702 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 703 FileStatus status, long start, long length) throws IOException { 704 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 705 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 706 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 707 return blocksDistribution; 708 } 709 710 /** 711 * Update blocksDistribution with blockLocations 712 * @param blocksDistribution the hdfs blocks distribution 713 * @param blockLocations an array containing block location 714 */ 715 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 716 BlockLocation[] blockLocations) throws IOException { 717 for (BlockLocation bl : blockLocations) { 718 String[] hosts = bl.getHosts(); 719 long len = bl.getLength(); 720 StorageType[] storageTypes = bl.getStorageTypes(); 721 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 722 } 723 } 724 725 // TODO move this method OUT of FSUtils. No dependencies to HMaster 726 /** 727 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 728 * @param master The master defining the HBase root and file system 729 * @return A map for each table and its percentage (never null) 730 * @throws IOException When scanning the directory fails 731 */ 732 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 733 Map<String, Integer> map = getTableFragmentation(master); 734 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 735 } 736 737 /** 738 * Runs through the HBase rootdir and checks how many stores for each table have more than one 739 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 740 * stored under the special key "-TOTAL-". 741 * @param master The master defining the HBase root and file system. 742 * @return A map for each table and its percentage (never null). 743 * @throws IOException When scanning the directory fails. 744 */ 745 public static Map<String, Integer> getTableFragmentation(final HMaster master) 746 throws IOException { 747 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 748 // since HMaster.getFileSystem() is package private 749 FileSystem fs = path.getFileSystem(master.getConfiguration()); 750 return getTableFragmentation(fs, path); 751 } 752 753 /** 754 * Runs through the HBase rootdir and checks how many stores for each table have more than one 755 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 756 * stored under the special key "-TOTAL-". 757 * @param fs The file system to use 758 * @param hbaseRootDir The root directory to scan 759 * @return A map for each table and its percentage (never null) 760 * @throws IOException When scanning the directory fails 761 */ 762 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 763 final Path hbaseRootDir) throws IOException { 764 Map<String, Integer> frags = new HashMap<>(); 765 int cfCountTotal = 0; 766 int cfFragTotal = 0; 767 PathFilter regionFilter = new RegionDirFilter(fs); 768 PathFilter familyFilter = new FamilyDirFilter(fs); 769 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 770 for (Path d : tableDirs) { 771 int cfCount = 0; 772 int cfFrag = 0; 773 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 774 for (FileStatus regionDir : regionDirs) { 775 Path dd = regionDir.getPath(); 776 // else its a region name, now look in region for families 777 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 778 for (FileStatus familyDir : familyDirs) { 779 cfCount++; 780 cfCountTotal++; 781 Path family = familyDir.getPath(); 782 // now in family make sure only one file 783 FileStatus[] familyStatus = fs.listStatus(family); 784 if (familyStatus.length > 1) { 785 cfFrag++; 786 cfFragTotal++; 787 } 788 } 789 } 790 // compute percentage per table and store in result list 791 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 792 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 793 } 794 // set overall percentage for all tables 795 frags.put("-TOTAL-", 796 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 797 return frags; 798 } 799 800 /** 801 * A {@link PathFilter} that returns only regular files. 802 */ 803 static class FileFilter extends AbstractFileStatusFilter { 804 private final FileSystem fs; 805 806 public FileFilter(final FileSystem fs) { 807 this.fs = fs; 808 } 809 810 @Override 811 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 812 try { 813 return isFile(fs, isDir, p); 814 } catch (IOException e) { 815 LOG.warn("Unable to verify if path={} is a regular file", p, e); 816 return false; 817 } 818 } 819 } 820 821 /** 822 * Directory filter that doesn't include any of the directories in the specified blacklist 823 */ 824 public static class BlackListDirFilter extends AbstractFileStatusFilter { 825 private final FileSystem fs; 826 private List<String> blacklist; 827 828 /** 829 * Create a filter on the givem filesystem with the specified blacklist 830 * @param fs filesystem to filter 831 * @param directoryNameBlackList list of the names of the directories to filter. If 832 * <tt>null</tt>, all directories are returned 833 */ 834 @SuppressWarnings("unchecked") 835 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 836 this.fs = fs; 837 blacklist = (List<String>) (directoryNameBlackList == null 838 ? Collections.emptyList() 839 : directoryNameBlackList); 840 } 841 842 @Override 843 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 844 if (!isValidName(p.getName())) { 845 return false; 846 } 847 848 try { 849 return isDirectory(fs, isDir, p); 850 } catch (IOException e) { 851 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 852 + " Returning 'not valid' and continuing.", p, e); 853 return false; 854 } 855 } 856 857 protected boolean isValidName(final String name) { 858 return !blacklist.contains(name); 859 } 860 } 861 862 /** 863 * A {@link PathFilter} that only allows directories. 864 */ 865 public static class DirFilter extends BlackListDirFilter { 866 867 public DirFilter(FileSystem fs) { 868 super(fs, null); 869 } 870 } 871 872 /** 873 * A {@link PathFilter} that returns usertable directories. To get all directories use the 874 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 875 */ 876 public static class UserTableDirFilter extends BlackListDirFilter { 877 public UserTableDirFilter(FileSystem fs) { 878 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 879 } 880 881 @Override 882 protected boolean isValidName(final String name) { 883 if (!super.isValidName(name)) return false; 884 885 try { 886 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 887 } catch (IllegalArgumentException e) { 888 LOG.info("Invalid table name: {}", name); 889 return false; 890 } 891 return true; 892 } 893 } 894 895 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 896 throws IOException { 897 List<Path> tableDirs = new ArrayList<>(); 898 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 899 if (fs.exists(baseNamespaceDir)) { 900 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 901 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 902 } 903 } 904 return tableDirs; 905 } 906 907 /** 908 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 909 * such as .logs, .oldlogs, .corrupt folders. 910 */ 911 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 912 throws IOException { 913 // presumes any directory under hbase.rootdir is a table 914 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 915 List<Path> tabledirs = new ArrayList<>(dirs.length); 916 for (FileStatus dir : dirs) { 917 tabledirs.add(dir.getPath()); 918 } 919 return tabledirs; 920 } 921 922 /** 923 * Filter for all dirs that don't start with '.' 924 */ 925 public static class RegionDirFilter extends AbstractFileStatusFilter { 926 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 927 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 928 final FileSystem fs; 929 930 public RegionDirFilter(FileSystem fs) { 931 this.fs = fs; 932 } 933 934 @Override 935 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 936 if (!regionDirPattern.matcher(p.getName()).matches()) { 937 return false; 938 } 939 940 try { 941 return isDirectory(fs, isDir, p); 942 } catch (IOException ioe) { 943 // Maybe the file was moved or the fs was disconnected. 944 LOG.warn("Skipping file {} due to IOException", p, ioe); 945 return false; 946 } 947 } 948 } 949 950 /** 951 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 952 * .tableinfo 953 * @param fs A file system for the Path 954 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 955 * @return List of paths to valid region directories in table dir. 956 */ 957 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 958 throws IOException { 959 // assumes we are in a table dir. 960 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 961 if (rds == null) { 962 return Collections.emptyList(); 963 } 964 List<Path> regionDirs = new ArrayList<>(rds.size()); 965 for (FileStatus rdfs : rds) { 966 Path rdPath = rdfs.getPath(); 967 regionDirs.add(rdPath); 968 } 969 return regionDirs; 970 } 971 972 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 973 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 974 } 975 976 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 977 return getRegionDirFromTableDir(tableDir, 978 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 979 } 980 981 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 982 return new Path(tableDir, encodedRegionName); 983 } 984 985 /** 986 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 987 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 988 */ 989 public static class FamilyDirFilter extends AbstractFileStatusFilter { 990 final FileSystem fs; 991 992 public FamilyDirFilter(FileSystem fs) { 993 this.fs = fs; 994 } 995 996 @Override 997 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 998 try { 999 // throws IAE if invalid 1000 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1001 } catch (IllegalArgumentException iae) { 1002 // path name is an invalid family name and thus is excluded. 1003 return false; 1004 } 1005 1006 try { 1007 return isDirectory(fs, isDir, p); 1008 } catch (IOException ioe) { 1009 // Maybe the file was moved or the fs was disconnected. 1010 LOG.warn("Skipping file {} due to IOException", p, ioe); 1011 return false; 1012 } 1013 } 1014 } 1015 1016 /** 1017 * Given a particular region dir, return all the familydirs inside it 1018 * @param fs A file system for the Path 1019 * @param regionDir Path to a specific region directory 1020 * @return List of paths to valid family directories in region dir. 1021 */ 1022 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1023 throws IOException { 1024 // assumes we are in a region dir. 1025 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1026 } 1027 1028 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1029 throws IOException { 1030 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1031 } 1032 1033 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1034 throws IOException { 1035 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1036 } 1037 1038 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1039 final PathFilter pathFilter) throws IOException { 1040 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1041 List<Path> files = new ArrayList<>(fds.length); 1042 for (FileStatus fdfs : fds) { 1043 Path fdPath = fdfs.getPath(); 1044 files.add(fdPath); 1045 } 1046 return files; 1047 } 1048 1049 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1050 int result = 0; 1051 try { 1052 for (Path familyDir : getFamilyDirs(fs, p)) { 1053 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1054 } 1055 } catch (IOException e) { 1056 LOG.warn("Error Counting reference files.", e); 1057 } 1058 return result; 1059 } 1060 1061 public static class ReferenceAndLinkFileFilter implements PathFilter { 1062 1063 private final FileSystem fs; 1064 1065 public ReferenceAndLinkFileFilter(FileSystem fs) { 1066 this.fs = fs; 1067 } 1068 1069 @Override 1070 public boolean accept(Path rd) { 1071 try { 1072 // only files can be references. 1073 return !fs.getFileStatus(rd).isDirectory() 1074 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1075 } catch (IOException ioe) { 1076 // Maybe the file was moved or the fs was disconnected. 1077 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1078 return false; 1079 } 1080 } 1081 } 1082 1083 /** 1084 * Filter for HFiles that excludes reference files. 1085 */ 1086 public static class HFileFilter extends AbstractFileStatusFilter { 1087 final FileSystem fs; 1088 1089 public HFileFilter(FileSystem fs) { 1090 this.fs = fs; 1091 } 1092 1093 @Override 1094 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1095 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1096 return false; 1097 } 1098 1099 try { 1100 return isFile(fs, isDir, p); 1101 } catch (IOException ioe) { 1102 // Maybe the file was moved or the fs was disconnected. 1103 LOG.warn("Skipping file {} due to IOException", p, ioe); 1104 return false; 1105 } 1106 } 1107 } 1108 1109 /** 1110 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1111 * if a link is file or not. 1112 */ 1113 public static class HFileLinkFilter implements PathFilter { 1114 1115 @Override 1116 public boolean accept(Path p) { 1117 return HFileLink.isHFileLink(p); 1118 } 1119 } 1120 1121 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1122 1123 private final FileSystem fs; 1124 1125 public ReferenceFileFilter(FileSystem fs) { 1126 this.fs = fs; 1127 } 1128 1129 @Override 1130 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1131 if (!StoreFileInfo.isReference(p)) { 1132 return false; 1133 } 1134 1135 try { 1136 // only files can be references. 1137 return isFile(fs, isDir, p); 1138 } catch (IOException ioe) { 1139 // Maybe the file was moved or the fs was disconnected. 1140 LOG.warn("Skipping file {} due to IOException", p, ioe); 1141 return false; 1142 } 1143 } 1144 } 1145 1146 /** 1147 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1148 */ 1149 interface ProgressReporter { 1150 /** 1151 * @param status File or directory we are about to process. 1152 */ 1153 void progress(FileStatus status); 1154 } 1155 1156 /** 1157 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1158 * names to the full Path. <br> 1159 * Example...<br> 1160 * Key = 3944417774205889744 <br> 1161 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1162 * @param map map to add values. If null, this method will create and populate one to 1163 * return 1164 * @param fs The file system to use. 1165 * @param hbaseRootDir The root directory to scan. 1166 * @param tableName name of the table to scan. 1167 * @return Map keyed by StoreFile name with a value of the full Path. 1168 * @throws IOException When scanning the directory fails. 1169 */ 1170 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1171 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1172 throws IOException, InterruptedException { 1173 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1174 (ProgressReporter) null); 1175 } 1176 1177 /** 1178 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1179 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1180 * that we will skip files that no longer exist by the time we traverse them and similarly the 1181 * user of the result needs to consider that some entries in this map may not exist by the time 1182 * this call completes. <br> 1183 * Example...<br> 1184 * Key = 3944417774205889744 <br> 1185 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1186 * @param resultMap map to add values. If null, this method will create and populate one to 1187 * return 1188 * @param fs The file system to use. 1189 * @param hbaseRootDir The root directory to scan. 1190 * @param tableName name of the table to scan. 1191 * @param sfFilter optional path filter to apply to store files 1192 * @param executor optional executor service to parallelize this operation 1193 * @param progressReporter Instance or null; gets called every time we move to new region of 1194 * family dir and for each store file. 1195 * @return Map keyed by StoreFile name with a value of the full Path. 1196 * @throws IOException When scanning the directory fails. 1197 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1198 */ 1199 @Deprecated 1200 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1201 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1202 ExecutorService executor, final HbckErrorReporter progressReporter) 1203 throws IOException, InterruptedException { 1204 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1205 new ProgressReporter() { 1206 @Override 1207 public void progress(FileStatus status) { 1208 // status is not used in this implementation. 1209 progressReporter.progress(); 1210 } 1211 }); 1212 } 1213 1214 /** 1215 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1216 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1217 * that we will skip files that no longer exist by the time we traverse them and similarly the 1218 * user of the result needs to consider that some entries in this map may not exist by the time 1219 * this call completes. <br> 1220 * Example...<br> 1221 * Key = 3944417774205889744 <br> 1222 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1223 * @param resultMap map to add values. If null, this method will create and populate one to 1224 * return 1225 * @param fs The file system to use. 1226 * @param hbaseRootDir The root directory to scan. 1227 * @param tableName name of the table to scan. 1228 * @param sfFilter optional path filter to apply to store files 1229 * @param executor optional executor service to parallelize this operation 1230 * @param progressReporter Instance or null; gets called every time we move to new region of 1231 * family dir and for each store file. 1232 * @return Map keyed by StoreFile name with a value of the full Path. 1233 * @throws IOException When scanning the directory fails. 1234 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1235 */ 1236 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1237 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1238 ExecutorService executor, final ProgressReporter progressReporter) 1239 throws IOException, InterruptedException { 1240 1241 final Map<String, Path> finalResultMap = 1242 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1243 1244 // only include the directory paths to tables 1245 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1246 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1247 // should be regions. 1248 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1249 final Vector<Exception> exceptions = new Vector<>(); 1250 1251 try { 1252 List<FileStatus> regionDirs = 1253 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1254 if (regionDirs == null) { 1255 return finalResultMap; 1256 } 1257 1258 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1259 1260 for (FileStatus regionDir : regionDirs) { 1261 if (null != progressReporter) { 1262 progressReporter.progress(regionDir); 1263 } 1264 final Path dd = regionDir.getPath(); 1265 1266 if (!exceptions.isEmpty()) { 1267 break; 1268 } 1269 1270 Runnable getRegionStoreFileMapCall = new Runnable() { 1271 @Override 1272 public void run() { 1273 try { 1274 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1275 List<FileStatus> familyDirs = 1276 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1277 if (familyDirs == null) { 1278 if (!fs.exists(dd)) { 1279 LOG.warn("Skipping region because it no longer exists: " + dd); 1280 } else { 1281 LOG.warn("Skipping region because it has no family dirs: " + dd); 1282 } 1283 return; 1284 } 1285 for (FileStatus familyDir : familyDirs) { 1286 if (null != progressReporter) { 1287 progressReporter.progress(familyDir); 1288 } 1289 Path family = familyDir.getPath(); 1290 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1291 continue; 1292 } 1293 // now in family, iterate over the StoreFiles and 1294 // put in map 1295 FileStatus[] familyStatus = fs.listStatus(family); 1296 for (FileStatus sfStatus : familyStatus) { 1297 if (null != progressReporter) { 1298 progressReporter.progress(sfStatus); 1299 } 1300 Path sf = sfStatus.getPath(); 1301 if (sfFilter == null || sfFilter.accept(sf)) { 1302 regionStoreFileMap.put(sf.getName(), sf); 1303 } 1304 } 1305 } 1306 finalResultMap.putAll(regionStoreFileMap); 1307 } catch (Exception e) { 1308 LOG.error("Could not get region store file map for region: " + dd, e); 1309 exceptions.add(e); 1310 } 1311 } 1312 }; 1313 1314 // If executor is available, submit async tasks to exec concurrently, otherwise 1315 // just do serial sync execution 1316 if (executor != null) { 1317 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1318 futures.add(future); 1319 } else { 1320 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1321 future.run(); 1322 futures.add(future); 1323 } 1324 } 1325 1326 // Ensure all pending tasks are complete (or that we run into an exception) 1327 for (Future<?> f : futures) { 1328 if (!exceptions.isEmpty()) { 1329 break; 1330 } 1331 try { 1332 f.get(); 1333 } catch (ExecutionException e) { 1334 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1335 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1336 } 1337 } 1338 } catch (IOException e) { 1339 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1340 exceptions.add(e); 1341 } finally { 1342 if (!exceptions.isEmpty()) { 1343 // Just throw the first exception as an indication something bad happened 1344 // Don't need to propagate all the exceptions, we already logged them all anyway 1345 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1346 throw new IOException(exceptions.firstElement()); 1347 } 1348 } 1349 1350 return finalResultMap; 1351 } 1352 1353 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1354 int result = 0; 1355 try { 1356 for (Path familyDir : getFamilyDirs(fs, p)) { 1357 result += getReferenceFilePaths(fs, familyDir).size(); 1358 } 1359 } catch (IOException e) { 1360 LOG.warn("Error counting reference files", e); 1361 } 1362 return result; 1363 } 1364 1365 /** 1366 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1367 * the full Path. <br> 1368 * Example...<br> 1369 * Key = 3944417774205889744 <br> 1370 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1371 * @param fs The file system to use. 1372 * @param hbaseRootDir The root directory to scan. 1373 * @return Map keyed by StoreFile name with a value of the full Path. 1374 * @throws IOException When scanning the directory fails. 1375 */ 1376 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1377 final Path hbaseRootDir) throws IOException, InterruptedException { 1378 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1379 } 1380 1381 /** 1382 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1383 * the full Path. <br> 1384 * Example...<br> 1385 * Key = 3944417774205889744 <br> 1386 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1387 * @param fs The file system to use. 1388 * @param hbaseRootDir The root directory to scan. 1389 * @param sfFilter optional path filter to apply to store files 1390 * @param executor optional executor service to parallelize this operation 1391 * @param progressReporter Instance or null; gets called every time we move to new region of 1392 * family dir and for each store file. 1393 * @return Map keyed by StoreFile name with a value of the full Path. 1394 * @throws IOException When scanning the directory fails. 1395 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1396 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1397 */ 1398 @Deprecated 1399 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1400 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1401 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1402 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1403 @Override 1404 public void progress(FileStatus status) { 1405 // status is not used in this implementation. 1406 progressReporter.progress(); 1407 } 1408 }); 1409 } 1410 1411 /** 1412 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1413 * the full Path. <br> 1414 * Example...<br> 1415 * Key = 3944417774205889744 <br> 1416 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1417 * @param fs The file system to use. 1418 * @param hbaseRootDir The root directory to scan. 1419 * @param sfFilter optional path filter to apply to store files 1420 * @param executor optional executor service to parallelize this operation 1421 * @param progressReporter Instance or null; gets called every time we move to new region of 1422 * family dir and for each store file. 1423 * @return Map keyed by StoreFile name with a value of the full Path. 1424 * @throws IOException When scanning the directory fails. 1425 */ 1426 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1427 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1428 ProgressReporter progressReporter) throws IOException, InterruptedException { 1429 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1430 1431 // if this method looks similar to 'getTableFragmentation' that is because 1432 // it was borrowed from it. 1433 1434 // only include the directory paths to tables 1435 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1436 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1437 sfFilter, executor, progressReporter); 1438 } 1439 return map; 1440 } 1441 1442 /** 1443 * Filters FileStatuses in an array and returns a list 1444 * @param input An array of FileStatuses 1445 * @param filter A required filter to filter the array 1446 * @return A list of FileStatuses 1447 */ 1448 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1449 if (input == null) return null; 1450 return filterFileStatuses(Iterators.forArray(input), filter); 1451 } 1452 1453 /** 1454 * Filters FileStatuses in an iterator and returns a list 1455 * @param input An iterator of FileStatuses 1456 * @param filter A required filter to filter the array 1457 * @return A list of FileStatuses 1458 */ 1459 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1460 FileStatusFilter filter) { 1461 if (input == null) return null; 1462 ArrayList<FileStatus> results = new ArrayList<>(); 1463 while (input.hasNext()) { 1464 FileStatus f = input.next(); 1465 if (filter.accept(f)) { 1466 results.add(f); 1467 } 1468 } 1469 return results; 1470 } 1471 1472 /** 1473 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1474 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1475 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1476 * @param fs file system 1477 * @param dir directory 1478 * @param filter file status filter 1479 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1480 */ 1481 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1482 final FileStatusFilter filter) throws IOException { 1483 FileStatus[] status = null; 1484 try { 1485 status = fs.listStatus(dir); 1486 } catch (FileNotFoundException fnfe) { 1487 LOG.trace("{} does not exist", dir); 1488 return null; 1489 } 1490 1491 if (ArrayUtils.getLength(status) == 0) { 1492 return null; 1493 } 1494 1495 if (filter == null) { 1496 return Arrays.asList(status); 1497 } else { 1498 List<FileStatus> status2 = filterFileStatuses(status, filter); 1499 if (status2 == null || status2.isEmpty()) { 1500 return null; 1501 } else { 1502 return status2; 1503 } 1504 } 1505 } 1506 1507 /** 1508 * This function is to scan the root path of the file system to get the degree of locality for 1509 * each region on each of the servers having at least one block of that region. This is used by 1510 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1511 * use 1512 * @return the mapping from region encoded name to a map of server names to locality fraction in 1513 * case of file system errors or interrupts 1514 */ 1515 public static Map<String, Map<String, Float>> 1516 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1517 return getRegionDegreeLocalityMappingFromFS(conf, null, 1518 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1519 1520 } 1521 1522 /** 1523 * This function is to scan the root path of the file system to get the degree of locality for 1524 * each region on each of the servers having at least one block of that region. the configuration 1525 * to use the table you wish to scan locality for the thread pool size to use 1526 * @return the mapping from region encoded name to a map of server names to locality fraction in 1527 * case of file system errors or interrupts 1528 */ 1529 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1530 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1531 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1532 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1533 return regionDegreeLocalityMapping; 1534 } 1535 1536 /** 1537 * This function is to scan the root path of the file system to get either the mapping between the 1538 * region name and its best locality region server or the degree of locality of each region on 1539 * each of the servers having at least one block of that region. The output map parameters are 1540 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1541 * size to use the map into which to put the locality degree mapping or null, must be a 1542 * thread-safe implementation in case of file system errors or interrupts 1543 */ 1544 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1545 final String desiredTable, int threadPoolSize, 1546 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1547 final FileSystem fs = FileSystem.get(conf); 1548 final Path rootPath = CommonFSUtils.getRootDir(conf); 1549 final long startTime = EnvironmentEdgeManager.currentTime(); 1550 final Path queryPath; 1551 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1552 if (null == desiredTable) { 1553 queryPath = 1554 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1555 } else { 1556 queryPath = new Path( 1557 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1558 } 1559 1560 // reject all paths that are not appropriate 1561 PathFilter pathFilter = new PathFilter() { 1562 @Override 1563 public boolean accept(Path path) { 1564 // this is the region name; it may get some noise data 1565 if (null == path) { 1566 return false; 1567 } 1568 1569 // no parent? 1570 Path parent = path.getParent(); 1571 if (null == parent) { 1572 return false; 1573 } 1574 1575 String regionName = path.getName(); 1576 if (null == regionName) { 1577 return false; 1578 } 1579 1580 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1581 return false; 1582 } 1583 return true; 1584 } 1585 }; 1586 1587 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1588 1589 if (LOG.isDebugEnabled()) { 1590 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1591 } 1592 1593 if (null == statusList) { 1594 return; 1595 } 1596 1597 // lower the number of threads in case we have very few expected regions 1598 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1599 1600 // run in multiple threads 1601 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1602 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1603 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1604 try { 1605 // ignore all file status items that are not of interest 1606 for (FileStatus regionStatus : statusList) { 1607 if (null == regionStatus || !regionStatus.isDirectory()) { 1608 continue; 1609 } 1610 1611 final Path regionPath = regionStatus.getPath(); 1612 if (null != regionPath) { 1613 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1614 } 1615 } 1616 } finally { 1617 tpe.shutdown(); 1618 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1619 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1620 try { 1621 // here we wait until TPE terminates, which is either naturally or by 1622 // exceptions in the execution of the threads 1623 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1624 // printing out rough estimate, so as to not introduce 1625 // AtomicInteger 1626 LOG.info("Locality checking is underway: { Scanned Regions : " 1627 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1628 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1629 } 1630 } catch (InterruptedException e) { 1631 Thread.currentThread().interrupt(); 1632 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1633 } 1634 } 1635 1636 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1637 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1638 } 1639 1640 /** 1641 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1642 * hbase or hdfs. 1643 */ 1644 public static void setupShortCircuitRead(final Configuration conf) { 1645 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1646 boolean shortCircuitSkipChecksum = 1647 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1648 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1649 if (shortCircuitSkipChecksum) { 1650 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1651 + "be set to true." 1652 + (useHBaseChecksum 1653 ? " HBase checksum doesn't require " 1654 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1655 : "")); 1656 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1657 } 1658 checkShortCircuitReadBufferSize(conf); 1659 } 1660 1661 /** 1662 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1663 */ 1664 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1665 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1666 final int notSet = -1; 1667 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1668 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1669 int size = conf.getInt(dfsKey, notSet); 1670 // If a size is set, return -- we will use it. 1671 if (size != notSet) return; 1672 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1673 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1674 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1675 } 1676 1677 /** 1678 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1679 */ 1680 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1681 throws IOException { 1682 if (!CommonFSUtils.isHDFS(c)) { 1683 return null; 1684 } 1685 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1686 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1687 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1688 final String name = "getHedgedReadMetrics"; 1689 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1690 Method m; 1691 try { 1692 m = dfsclient.getClass().getDeclaredMethod(name); 1693 } catch (NoSuchMethodException e) { 1694 LOG.warn( 1695 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1696 return null; 1697 } catch (SecurityException e) { 1698 LOG.warn( 1699 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1700 return null; 1701 } 1702 m.setAccessible(true); 1703 try { 1704 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1705 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1706 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1707 + e.getMessage()); 1708 return null; 1709 } 1710 } 1711 1712 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1713 Configuration conf, int threads) throws IOException { 1714 ExecutorService pool = Executors.newFixedThreadPool(threads); 1715 List<Future<Void>> futures = new ArrayList<>(); 1716 List<Path> traversedPaths; 1717 try { 1718 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1719 for (Future<Void> future : futures) { 1720 future.get(); 1721 } 1722 } catch (ExecutionException | InterruptedException | IOException e) { 1723 throw new IOException("Copy snapshot reference files failed", e); 1724 } finally { 1725 pool.shutdownNow(); 1726 } 1727 return traversedPaths; 1728 } 1729 1730 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1731 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1732 List<Path> traversedPaths = new ArrayList<>(); 1733 traversedPaths.add(dst); 1734 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1735 if (currentFileStatus.isDirectory()) { 1736 if (!dstFS.mkdirs(dst)) { 1737 throw new IOException("Create directory failed: " + dst); 1738 } 1739 FileStatus[] subPaths = srcFS.listStatus(src); 1740 for (FileStatus subPath : subPaths) { 1741 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1742 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1743 } 1744 } else { 1745 Future<Void> future = pool.submit(() -> { 1746 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1747 return null; 1748 }); 1749 futures.add(future); 1750 } 1751 return traversedPaths; 1752 } 1753 1754 /** Returns A set containing all namenode addresses of fs */ 1755 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1756 Configuration conf) { 1757 Set<InetSocketAddress> addresses = new HashSet<>(); 1758 String serviceName = fs.getCanonicalServiceName(); 1759 1760 if (serviceName.startsWith("ha-hdfs")) { 1761 try { 1762 Map<String, Map<String, InetSocketAddress>> addressMap = 1763 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1764 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1765 if (addressMap.containsKey(nameService)) { 1766 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1767 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1768 InetSocketAddress addr = e2.getValue(); 1769 addresses.add(addr); 1770 } 1771 } 1772 } catch (Exception e) { 1773 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1774 } 1775 } else { 1776 URI uri = fs.getUri(); 1777 int port = uri.getPort(); 1778 if (port < 0) { 1779 int idx = serviceName.indexOf(':'); 1780 port = Integer.parseInt(serviceName.substring(idx + 1)); 1781 } 1782 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1783 addresses.add(addr); 1784 } 1785 1786 return addresses; 1787 } 1788 1789 /** 1790 * @param conf the Configuration of HBase 1791 * @return Whether srcFs and desFs are on same hdfs or not 1792 */ 1793 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1794 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1795 // show a unified format which contains scheme, host and port. 1796 String srcServiceName = srcFs.getCanonicalServiceName(); 1797 String desServiceName = desFs.getCanonicalServiceName(); 1798 1799 if (srcServiceName == null || desServiceName == null) { 1800 return false; 1801 } 1802 if (srcServiceName.equals(desServiceName)) { 1803 return true; 1804 } 1805 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1806 Collection<String> internalNameServices = 1807 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1808 if (!internalNameServices.isEmpty()) { 1809 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1810 return true; 1811 } else { 1812 return false; 1813 } 1814 } 1815 } 1816 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1817 // If one serviceName is an HA format while the other is a non-HA format, 1818 // maybe they refer to the same FileSystem. 1819 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1820 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1821 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1822 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1823 return true; 1824 } 1825 } 1826 1827 return false; 1828 } 1829}