001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021 022import edu.umd.cs.findbugs.annotations.Nullable; 023import java.io.Closeable; 024import java.io.IOException; 025import java.lang.reflect.Field; 026import java.lang.reflect.InaccessibleObjectException; 027import java.lang.reflect.InvocationHandler; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.lang.reflect.Modifier; 031import java.lang.reflect.Proxy; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.net.URI; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.FilterFileSystem; 038import org.apache.hadoop.fs.LocalFileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.hadoop.hdfs.DFSClient; 045import org.apache.hadoop.hdfs.DistributedFileSystem; 046import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 047import org.apache.hadoop.hdfs.protocol.ClientProtocol; 048import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 049import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 050import org.apache.hadoop.hdfs.protocol.LocatedBlock; 051import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 052import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 053import org.apache.hadoop.ipc.RPC; 054import org.apache.hadoop.util.Progressable; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * An encapsulation for the FileSystem object that hbase uses to access data. This class allows the 061 * flexibility of using separate filesystem objects for reading and writing hfiles and wals. 062 */ 063@InterfaceAudience.Private 064public class HFileSystem extends FilterFileSystem { 065 public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class); 066 067 private final FileSystem noChecksumFs; // read hfile data from storage 068 private final boolean useHBaseChecksum; 069 private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; 070 071 /** 072 * Create a FileSystem object for HBase regionservers. 073 * @param conf The configuration to be used for the filesystem 074 * @param useHBaseChecksum if true, then use checksum verfication in hbase, otherwise delegate 075 * checksum verification to the FileSystem. 076 */ 077 public HFileSystem(Configuration conf, boolean useHBaseChecksum) throws IOException { 078 079 // Create the default filesystem with checksum verification switched on. 080 // By default, any operation to this FilterFileSystem occurs on 081 // the underlying filesystem that has checksums switched on. 082 // This FS#get(URI, conf) clearly indicates in the javadoc that if the FS is 083 // not created it will initialize the FS and return that created FS. If it is 084 // already created it will just return the FS that was already created. 085 // We take pains to funnel all of our FileSystem instantiation through this call to ensure 086 // we never need to call FS.initialize ourself so that we do not have to track any state to 087 // avoid calling initialize more than once. 088 this.fs = FileSystem.get(getDefaultUri(conf), conf); 089 this.useHBaseChecksum = useHBaseChecksum; 090 091 // disable checksum verification for local fileSystem, see HBASE-11218 092 if (fs instanceof LocalFileSystem) { 093 fs.setWriteChecksum(false); 094 fs.setVerifyChecksum(false); 095 } 096 097 addLocationsOrderInterceptor(conf); 098 099 // If hbase checksum verification is switched on, then create a new 100 // filesystem object that has cksum verification turned off. 101 // We will avoid verifying checksums in the fs client, instead do it 102 // inside of hbase. 103 // If this is the local file system hadoop has a bug where seeks 104 // do not go to the correct location if setVerifyChecksum(false) is called. 105 // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read 106 // their header magic numbers. See HBASE-5885 107 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) { 108 conf = new Configuration(conf); 109 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 110 this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf); 111 this.noChecksumFs.setVerifyChecksum(false); 112 } else { 113 this.noChecksumFs = maybeWrapFileSystem(fs, conf); 114 } 115 116 this.fs = maybeWrapFileSystem(this.fs, conf); 117 } 118 119 /** 120 * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and writefs are both set to be 121 * the same specified fs. Do not verify hbase-checksums while reading data from filesystem. 122 * @param fs Set the noChecksumFs and writeFs to this specified filesystem. 123 */ 124 public HFileSystem(FileSystem fs) { 125 this.fs = fs; 126 this.noChecksumFs = fs; 127 this.useHBaseChecksum = false; 128 } 129 130 /** 131 * Returns the filesystem that is specially setup for doing reads from storage. This object avoids 132 * doing checksum verifications for reads. 133 * @return The FileSystem object that can be used to read data from files. 134 */ 135 public FileSystem getNoChecksumFs() { 136 return noChecksumFs; 137 } 138 139 /** 140 * Returns the underlying filesystem 141 * @return The underlying FileSystem for this FilterFileSystem object. 142 */ 143 public FileSystem getBackingFs() throws IOException { 144 return fs; 145 } 146 147 /** 148 * Set the source path (directory/file) to the specified storage policy. 149 * @param path The source path (directory/file). 150 * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. See see hadoop 2.6+ 151 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 'COLD', 152 * 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 153 */ 154 public void setStoragePolicy(Path path, String policyName) { 155 CommonFSUtils.setStoragePolicy(this.fs, path, policyName); 156 } 157 158 /** 159 * Get the storage policy of the source path (directory/file). 160 * @param path The source path (directory/file). 161 * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or 162 * exception thrown when trying to get policy 163 */ 164 @Nullable 165 public String getStoragePolicyName(Path path) { 166 try { 167 Object blockStoragePolicySpi = 168 ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); 169 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 170 } catch (Exception e) { 171 // Maybe fail because of using old HDFS version, try the old way 172 if (LOG.isTraceEnabled()) { 173 LOG.trace("Failed to get policy directly", e); 174 } 175 return getStoragePolicyForOldHDFSVersion(path); 176 } 177 } 178 179 /** 180 * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need 181 * to keep compatible with it. See HADOOP-12161 for more details. 182 * @param path Path to get storage policy against 183 * @return the storage policy name 184 */ 185 private String getStoragePolicyForOldHDFSVersion(Path path) { 186 try { 187 if (this.fs instanceof DistributedFileSystem) { 188 DistributedFileSystem dfs = (DistributedFileSystem) this.fs; 189 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 190 if (null != status) { 191 if (unspecifiedStoragePolicyId < 0) { 192 // Get the unspecified id field through reflection to avoid compilation error. 193 // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to 194 // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED 195 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 196 unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); 197 } 198 byte storagePolicyId = status.getStoragePolicy(); 199 if (storagePolicyId != unspecifiedStoragePolicyId) { 200 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 201 for (BlockStoragePolicy policy : policies) { 202 if (policy.getId() == storagePolicyId) { 203 return policy.getName(); 204 } 205 } 206 } 207 } 208 } 209 } catch (Throwable e) { 210 LOG.warn("failed to get block storage policy of [" + path + "]", e); 211 } 212 213 return null; 214 } 215 216 /** 217 * Are we verifying checksums in HBase? 218 * @return True, if hbase is configured to verify checksums, otherwise false. 219 */ 220 public boolean useHBaseChecksum() { 221 return useHBaseChecksum; 222 } 223 224 /** 225 * Close this filesystem object 226 */ 227 @Override 228 public void close() throws IOException { 229 super.close(); 230 if (this.noChecksumFs != fs) { 231 this.noChecksumFs.close(); 232 } 233 } 234 235 /** 236 * Returns a brand new instance of the FileSystem. It does not use the FileSystem.Cache. In newer 237 * versions of HDFS, we can directly invoke FileSystem.newInstance(Configuration). 238 * @param conf Configuration 239 * @return A new instance of the filesystem 240 */ 241 private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException { 242 URI uri = FileSystem.getDefaultUri(conf); 243 FileSystem fs = null; 244 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); 245 if (clazz != null) { 246 // This will be true for Hadoop 1.0, or 0.20. 247 fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); 248 fs.initialize(uri, conf); 249 } else { 250 // For Hadoop 2.0, we have to go through FileSystem for the filesystem 251 // implementation to be loaded by the service loader in case it has not 252 // been loaded yet. 253 Configuration clone = new Configuration(conf); 254 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true); 255 fs = FileSystem.get(uri, clone); 256 } 257 if (fs == null) { 258 throw new IOException("No FileSystem for scheme: " + uri.getScheme()); 259 } 260 261 return fs; 262 } 263 264 /** 265 * Returns an instance of Filesystem wrapped into the class specified in hbase.fs.wrapper 266 * property, if one is set in the configuration, returns unmodified FS instance passed in as an 267 * argument otherwise. 268 * @param base Filesystem instance to wrap 269 * @param conf Configuration 270 * @return wrapped instance of FS, or the same instance if no wrapping configured. 271 */ 272 private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) { 273 try { 274 Class<?> clazz = conf.getClass("hbase.fs.wrapper", null); 275 if (clazz != null) { 276 return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class) 277 .newInstance(base, conf); 278 } 279 } catch (Exception e) { 280 LOG.error("Failed to wrap filesystem: " + e); 281 } 282 return base; 283 } 284 285 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException { 286 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks()); 287 } 288 289 /** 290 * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient linked to 291 * this FileSystem. See HBASE-6435 for the background. 292 * <p/> 293 * There should be no reason, except testing, to create a specific ReorderBlocks. 294 * @return true if the interceptor was added, false otherwise. 295 */ 296 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) { 297 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default 298 LOG.debug("addLocationsOrderInterceptor configured to false"); 299 return false; 300 } 301 302 FileSystem fs; 303 try { 304 fs = FileSystem.get(conf); 305 } catch (IOException e) { 306 LOG.warn("Can't get the file system from the conf.", e); 307 return false; 308 } 309 310 if (!(fs instanceof DistributedFileSystem)) { 311 LOG.debug("The file system is not a DistributedFileSystem. " 312 + "Skipping on block location reordering"); 313 return false; 314 } 315 316 DistributedFileSystem dfs = (DistributedFileSystem) fs; 317 DFSClient dfsc = dfs.getClient(); 318 if (dfsc == null) { 319 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " 320 + "block reordering interceptor. Continuing, but this is unexpected."); 321 return false; 322 } 323 324 try { 325 Field nf = DFSClient.class.getDeclaredField("namenode"); 326 nf.setAccessible(true); 327 Field modifiersField = ReflectionUtils.getModifiersField(); 328 modifiersField.setAccessible(true); 329 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL); 330 331 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc); 332 if (namenode == null) { 333 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" 334 + " reordering interceptor. Continuing, but this is unexpected."); 335 return false; 336 } 337 338 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf); 339 nf.set(dfsc, cp1); 340 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" 341 + " using class " + lrb.getClass().getName()); 342 } catch (NoSuchFieldException | IllegalAccessException | InaccessibleObjectException e) { 343 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 344 return false; 345 } 346 347 return true; 348 } 349 350 private static ClientProtocol createReorderingProxy(final ClientProtocol cp, 351 final ReorderBlocks lrb, final Configuration conf) { 352 return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(), 353 new Class[] { ClientProtocol.class, Closeable.class }, new InvocationHandler() { 354 @Override 355 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 356 try { 357 if ((args == null || args.length == 0) && "close".equals(method.getName())) { 358 RPC.stopProxy(cp); 359 return null; 360 } else { 361 Object res = method.invoke(cp, args); 362 if ( 363 res != null && args != null && args.length == 3 364 && "getBlockLocations".equals(method.getName()) && res instanceof LocatedBlocks 365 && args[0] instanceof String && args[0] != null 366 ) { 367 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]); 368 } 369 return res; 370 } 371 } catch (InvocationTargetException ite) { 372 // We will have this for all the exception, checked on not, sent 373 // by any layer, including the functional exception 374 Throwable cause = ite.getCause(); 375 if (cause == null) { 376 throw new RuntimeException("Proxy invocation failed and getCause is null", ite); 377 } 378 if (cause instanceof UndeclaredThrowableException) { 379 Throwable causeCause = cause.getCause(); 380 if (causeCause == null) { 381 throw new RuntimeException("UndeclaredThrowableException had null cause!"); 382 } 383 cause = cause.getCause(); 384 } 385 throw cause; 386 } 387 } 388 }); 389 } 390 391 /** 392 * Interface to implement to add a specific reordering logic in hdfs. 393 */ 394 interface ReorderBlocks { 395 /** 396 * @param conf - the conf to use 397 * @param lbs - the LocatedBlocks to reorder 398 * @param src - the file name currently read 399 * @throws IOException - if something went wrong 400 */ 401 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException; 402 } 403 404 /** 405 * We're putting at lowest priority the wal files blocks that are on the same datanode as the 406 * original regionserver which created these files. This because we fear that the datanode is 407 * actually dead, so if we use it it will timeout. 408 */ 409 static class ReorderWALBlocks implements ReorderBlocks { 410 @Override 411 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) 412 throws IOException { 413 414 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src); 415 if (sn == null) { 416 // It's not an WAL 417 return; 418 } 419 420 // Ok, so it's an WAL 421 String hostName = sn.getHostname(); 422 if (LOG.isTraceEnabled()) { 423 LOG.trace(src + " is an WAL file, so reordering blocks, last hostname will be:" + hostName); 424 } 425 426 // Just check for all blocks 427 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 428 DatanodeInfo[] dnis = getLocatedBlockLocations(lb); 429 if (dnis != null && dnis.length > 1) { 430 boolean found = false; 431 for (int i = 0; i < dnis.length - 1 && !found; i++) { 432 if (hostName.equals(dnis[i].getHostName())) { 433 // advance the other locations by one and put this one at the last place. 434 DatanodeInfo toLast = dnis[i]; 435 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); 436 dnis[dnis.length - 1] = toLast; 437 found = true; 438 } 439 } 440 } 441 } 442 } 443 } 444 445 /** 446 * Create a new HFileSystem object, similar to FileSystem.get(). This returns a filesystem object 447 * that avoids checksum verification in the filesystem for hfileblock-reads. For these blocks, 448 * checksum verification is done by HBase. 449 */ 450 static public FileSystem get(Configuration conf) throws IOException { 451 return new HFileSystem(conf, true); 452 } 453 454 /** 455 * The org.apache.hadoop.fs.FilterFileSystem does not yet support createNonRecursive. This is a 456 * hadoop bug and when it is fixed in Hadoop, this definition will go away. 457 */ 458 @Override 459 @SuppressWarnings("deprecation") 460 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 461 short replication, long blockSize, Progressable progress) throws IOException { 462 return fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress); 463 } 464}