001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 031import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader; 032import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 036import org.apache.hadoop.hbase.wal.WALProvider.Writer; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042 043/** 044 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 045 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 046 * configuration setting "hbase.wal.provider". Available implementations: 047 * <ul> 048 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 049 * "asyncfs"</li> 050 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 051 * FileSystem interface via an asynchronous client.</li> 052 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 053 * FileSystem interface via HDFS's synchronous DFSClient.</li> 054 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 055 * server.</li> 056 * </ul> 057 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 058 */ 059@InterfaceAudience.Private 060public class WALFactory { 061 062 /** 063 * Used in tests for injecting customized stream reader implementation, for example, inject fault 064 * when reading, etc. 065 * <p/> 066 * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we 067 * will also determine whether the WAL file is encrypted and we should use 068 * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the 069 * header of the WAL file, so we do not need to specify a specical reader to read the WAL file 070 * either. 071 * <p/> 072 * So typically you should not use this config in production. 073 */ 074 public static final String WAL_STREAM_READER_CLASS_IMPL = 075 "hbase.regionserver.wal.stream.reader.impl"; 076 077 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 078 079 /** 080 * Maps between configuration names for providers and implementation classes. 081 */ 082 enum Providers { 083 defaultProvider(AsyncFSWALProvider.class), 084 filesystem(FSHLogProvider.class), 085 multiwal(RegionGroupingProvider.class), 086 asyncfs(AsyncFSWALProvider.class); 087 088 final Class<? extends WALProvider> clazz; 089 090 Providers(Class<? extends WALProvider> clazz) { 091 this.clazz = clazz; 092 } 093 } 094 095 public static final String WAL_PROVIDER = "hbase.wal.provider"; 096 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 097 098 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 099 100 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 101 102 final String factoryId; 103 final Abortable abortable; 104 private final WALProvider provider; 105 // The meta updates are written to a different wal. If this 106 // regionserver holds meta regions, then this ref will be non-null. 107 // lazily intialized; most RegionServers don't deal with META 108 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 109 110 /** 111 * Configuration-specified WAL Reader used when a custom reader is requested 112 */ 113 private final Class<? extends WALStreamReader> walStreamReaderClass; 114 115 /** 116 * How long to attempt opening in-recovery wals 117 */ 118 private final int timeoutMillis; 119 120 private final Configuration conf; 121 122 private final ExcludeDatanodeManager excludeDatanodeManager; 123 124 // Used for the singleton WALFactory, see below. 125 private WALFactory(Configuration conf) { 126 // this code is duplicated here so we can keep our members final. 127 // until we've moved reader/writer construction down into providers, this initialization must 128 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 129 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 130 /* TODO Both of these are probably specific to the fs wal provider */ 131 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 132 ProtobufWALStreamReader.class, WALStreamReader.class); 133 Preconditions.checkArgument( 134 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 135 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 136 AbstractFSWALProvider.Initializer.class.getName()); 137 this.conf = conf; 138 // end required early initialization 139 140 // this instance can't create wals, just reader/writers. 141 provider = null; 142 factoryId = SINGLETON_ID; 143 this.abortable = null; 144 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 145 } 146 147 Providers getDefaultProvider() { 148 return Providers.defaultProvider; 149 } 150 151 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 152 try { 153 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 154 155 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 156 // the default and we can't use it, we want to fall back to FSHLog which we know works on 157 // all versions. 158 if ( 159 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 160 && !AsyncFSWALProvider.load() 161 ) { 162 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 163 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 164 // easily broken when upgrading hadoop. 165 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 166 return FSHLogProvider.class; 167 } 168 169 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 170 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 171 // not fall back to FSHLogProvider. 172 return provider.clazz; 173 } catch (IllegalArgumentException exception) { 174 // Fall back to them specifying a class name 175 // Note that the passed default class shouldn't actually be used, since the above only fails 176 // when there is a config value present. 177 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 178 } 179 } 180 181 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 182 throws IOException { 183 LOG.info("Instantiating WALProvider of type " + clazz); 184 try { 185 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 186 result.init(this, conf, providerId, this.abortable); 187 return result; 188 } catch (Exception e) { 189 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 190 LOG.debug("Exception details for failure to load WALProvider.", e); 191 throw new IOException("couldn't set up WALProvider", e); 192 } 193 } 194 195 /** 196 * instantiate a provider from a config property. requires conf to have already been set (as well 197 * as anything the provider might need to read). 198 */ 199 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 200 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 201 WALProvider provider = createProvider(clazz, providerId); 202 provider.addWALActionsListener(new MetricsWAL()); 203 return provider; 204 } 205 206 public WALFactory(Configuration conf, String factoryId) throws IOException { 207 this(conf, factoryId, null); 208 } 209 210 /** 211 * @param conf must not be null, will keep a reference to read params in later reader/writer 212 * instances. 213 * @param abortable the server to abort 214 */ 215 public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException { 216 // until we've moved reader/writer construction down into providers, this initialization must 217 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 218 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 219 /* TODO Both of these are probably specific to the fs wal provider */ 220 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 221 ProtobufWALStreamReader.class, WALStreamReader.class); 222 Preconditions.checkArgument( 223 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 224 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 225 AbstractFSWALProvider.Initializer.class.getName()); 226 this.conf = conf; 227 this.factoryId = factoryId; 228 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 229 this.abortable = abortable; 230 // end required early initialization 231 if (conf.getBoolean(WAL_ENABLED, true)) { 232 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 233 } else { 234 // special handling of existing configuration behavior. 235 LOG.warn("Running with WAL disabled."); 236 provider = new DisabledWALProvider(); 237 provider.init(this, conf, factoryId, null); 238 } 239 } 240 241 /** 242 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 243 * replay and edits that have gone to any wals from this factory. 244 */ 245 public void close() throws IOException { 246 final WALProvider metaProvider = this.metaProvider.get(); 247 if (null != metaProvider) { 248 metaProvider.close(); 249 } 250 // close is called on a WALFactory with null provider in the case of contention handling 251 // within the getInstance method. 252 if (null != provider) { 253 provider.close(); 254 } 255 } 256 257 /** 258 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 259 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 260 * if you can as it will try to leave things as tidy as possible. 261 */ 262 public void shutdown() throws IOException { 263 IOException exception = null; 264 final WALProvider metaProvider = this.metaProvider.get(); 265 if (null != metaProvider) { 266 try { 267 metaProvider.shutdown(); 268 } catch (IOException ioe) { 269 exception = ioe; 270 } 271 } 272 provider.shutdown(); 273 if (null != exception) { 274 throw exception; 275 } 276 } 277 278 public List<WAL> getWALs() { 279 return provider.getWALs(); 280 } 281 282 /** 283 * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of 284 * creating the first hbase:meta WAL so we can register a listener. 285 * @see #getMetaWALProvider() 286 */ 287 public WALProvider getMetaProvider() throws IOException { 288 for (;;) { 289 WALProvider provider = this.metaProvider.get(); 290 if (provider != null) { 291 return provider; 292 } 293 Class<? extends WALProvider> clz = null; 294 if (conf.get(META_WAL_PROVIDER) == null) { 295 try { 296 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 297 } catch (Throwable t) { 298 // the WAL provider should be an enum. Proceed 299 } 300 } 301 if (clz == null) { 302 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 303 } 304 provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID); 305 if (metaProvider.compareAndSet(null, provider)) { 306 return provider; 307 } else { 308 // someone is ahead of us, close and try again. 309 provider.close(); 310 } 311 } 312 } 313 314 /** 315 * @param region the region which we want to get a WAL for. Could be null. 316 */ 317 public WAL getWAL(RegionInfo region) throws IOException { 318 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 319 if ( 320 region != null && region.isMetaRegion() 321 && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID 322 ) { 323 return getMetaProvider().getWAL(region); 324 } else { 325 return provider.getWAL(region); 326 } 327 } 328 329 public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { 330 return createStreamReader(fs, path, (CancelableProgressable) null); 331 } 332 333 /** 334 * Create a one-way stream reader for the WAL. 335 * @return A WAL reader. Close when done with it. 336 */ 337 public WALStreamReader createStreamReader(FileSystem fs, Path path, 338 CancelableProgressable reporter) throws IOException { 339 return createStreamReader(fs, path, reporter, -1); 340 } 341 342 /** 343 * Create a one-way stream reader for the WAL, and start reading from the given 344 * {@code startPosition}. 345 * @return A WAL reader. Close when done with it. 346 */ 347 public WALStreamReader createStreamReader(FileSystem fs, Path path, 348 CancelableProgressable reporter, long startPosition) throws IOException { 349 try { 350 // A wal file could be under recovery, so it may take several 351 // tries to get it open. Instead of claiming it is corrupted, retry 352 // to open it up to 5 minutes by default. 353 long startWaiting = EnvironmentEdgeManager.currentTime(); 354 long openTimeout = timeoutMillis + startWaiting; 355 int nbAttempt = 0; 356 WALStreamReader reader = null; 357 while (true) { 358 try { 359 reader = walStreamReaderClass.getDeclaredConstructor().newInstance(); 360 ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition); 361 return reader; 362 } catch (Exception e) { 363 // catch Exception so that we close reader for all exceptions. If we don't 364 // close the reader, we leak a socket. 365 if (reader != null) { 366 reader.close(); 367 } 368 369 // Only inspect the Exception to consider retry when it's an IOException 370 if (e instanceof IOException) { 371 String msg = e.getMessage(); 372 if ( 373 msg != null && (msg.contains("Cannot obtain block length") 374 || msg.contains("Could not obtain the last block") 375 || msg.matches("Blocklist for [^ ]* has changed.*")) 376 ) { 377 if (++nbAttempt == 1) { 378 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 379 } 380 if (reporter != null && !reporter.progress()) { 381 throw new InterruptedIOException("Operation is cancelled"); 382 } 383 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 384 LOG.error("Can't open after " + nbAttempt + " attempts and " 385 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 386 } else { 387 try { 388 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 389 continue; // retry 390 } catch (InterruptedException ie) { 391 InterruptedIOException iioe = new InterruptedIOException(); 392 iioe.initCause(ie); 393 throw iioe; 394 } 395 } 396 throw new LeaseNotRecoveredException(e); 397 } 398 } 399 400 // Rethrow the original exception if we are not retrying due to HDFS-isms. 401 throw e; 402 } 403 } 404 } catch (IOException ie) { 405 throw ie; 406 } catch (Exception e) { 407 throw new IOException("Cannot get log reader", e); 408 } 409 } 410 411 /** 412 * Create a writer for the WAL. Uses defaults. 413 * <p> 414 * Should be package-private. public only for tests and 415 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 416 * @return A WAL writer. Close when done with it. 417 */ 418 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 419 return FSHLogProvider.createWriter(conf, fs, path, false); 420 } 421 422 /** 423 * Should be package-private, visible for recovery testing. Uses defaults. 424 * @return an overwritable writer for recovered edits. caller should close. 425 */ 426 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 427 throws IOException { 428 return FSHLogProvider.createWriter(conf, fs, path, true); 429 } 430 431 // These static methods are currently used where it's impractical to 432 // untangle the reliance on state in the filesystem. They rely on singleton 433 // WALFactory that just provides Reader / Writers. 434 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 435 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 436 private static final String SINGLETON_ID = WALFactory.class.getName(); 437 438 // Public only for FSHLog 439 public static WALFactory getInstance(Configuration configuration) { 440 WALFactory factory = singleton.get(); 441 if (null == factory) { 442 WALFactory temp = new WALFactory(configuration); 443 if (singleton.compareAndSet(null, temp)) { 444 factory = temp; 445 } else { 446 // someone else beat us to initializing 447 try { 448 temp.close(); 449 } catch (IOException exception) { 450 LOG.debug("failed to close temporary singleton. ignoring.", exception); 451 } 452 factory = singleton.get(); 453 } 454 } 455 return factory; 456 } 457 458 /** 459 * Create a tailing reader for the given path. Mainly used in replication. 460 */ 461 public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf, 462 long startPosition) throws IOException { 463 ProtobufWALTailingReader reader = new ProtobufWALTailingReader(); 464 reader.init(fs, path, conf, startPosition); 465 return reader; 466 } 467 468 /** 469 * Create a one-way stream reader for a given path. 470 */ 471 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf) 472 throws IOException { 473 return createStreamReader(fs, path, conf, -1); 474 } 475 476 /** 477 * Create a one-way stream reader for a given path. 478 */ 479 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf, 480 long startPosition) throws IOException { 481 return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null, 482 startPosition); 483 } 484 485 /** 486 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 487 * @return a Writer that will overwrite files. Caller must close. 488 */ 489 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 490 final Configuration configuration) throws IOException { 491 return FSHLogProvider.createWriter(configuration, fs, path, true); 492 } 493 494 /** 495 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 496 * @return a writer that won't overwrite files. Caller must close. 497 */ 498 public static Writer createWALWriter(final FileSystem fs, final Path path, 499 final Configuration configuration) throws IOException { 500 return FSHLogProvider.createWriter(configuration, fs, path, false); 501 } 502 503 public String getFactoryId() { 504 return factoryId; 505 } 506 507 public WALProvider getWALProvider() { 508 return this.provider; 509 } 510 511 /** 512 * @return Current metaProvider... may be null if not yet initialized. 513 * @see #getMetaProvider() 514 */ 515 public WALProvider getMetaWALProvider() { 516 return this.metaProvider.get(); 517 } 518 519 public ExcludeDatanodeManager getExcludeDatanodeManager() { 520 return excludeDatanodeManager; 521 } 522}