001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicReference;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
031import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
032import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
036import org.apache.hadoop.hbase.wal.WALProvider.Writer;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042
043/**
044 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
045 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
046 * configuration setting "hbase.wal.provider". Available implementations:
047 * <ul>
048 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
049 * "asyncfs"</li>
050 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
051 * FileSystem interface via an asynchronous client.</li>
052 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
053 * FileSystem interface via HDFS's synchronous DFSClient.</li>
054 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
055 * server.</li>
056 * </ul>
057 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
058 */
059@InterfaceAudience.Private
060public class WALFactory {
061
062  /**
063   * Used in tests for injecting customized stream reader implementation, for example, inject fault
064   * when reading, etc.
065   * <p/>
066   * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
067   * will also determine whether the WAL file is encrypted and we should use
068   * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
069   * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
070   * either.
071   * <p/>
072   * So typically you should not use this config in production.
073   */
074  public static final String WAL_STREAM_READER_CLASS_IMPL =
075    "hbase.regionserver.wal.stream.reader.impl";
076
077  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
078
079  /**
080   * Maps between configuration names for providers and implementation classes.
081   */
082  enum Providers {
083    defaultProvider(AsyncFSWALProvider.class),
084    filesystem(FSHLogProvider.class),
085    multiwal(RegionGroupingProvider.class),
086    asyncfs(AsyncFSWALProvider.class);
087
088    final Class<? extends WALProvider> clazz;
089
090    Providers(Class<? extends WALProvider> clazz) {
091      this.clazz = clazz;
092    }
093  }
094
095  public static final String WAL_PROVIDER = "hbase.wal.provider";
096  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
097
098  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
099
100  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
101
102  final String factoryId;
103  final Abortable abortable;
104  private final WALProvider provider;
105  // The meta updates are written to a different wal. If this
106  // regionserver holds meta regions, then this ref will be non-null.
107  // lazily intialized; most RegionServers don't deal with META
108  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
109
110  /**
111   * Configuration-specified WAL Reader used when a custom reader is requested
112   */
113  private final Class<? extends WALStreamReader> walStreamReaderClass;
114
115  /**
116   * How long to attempt opening in-recovery wals
117   */
118  private final int timeoutMillis;
119
120  private final Configuration conf;
121
122  private final ExcludeDatanodeManager excludeDatanodeManager;
123
124  // Used for the singleton WALFactory, see below.
125  private WALFactory(Configuration conf) {
126    // this code is duplicated here so we can keep our members final.
127    // until we've moved reader/writer construction down into providers, this initialization must
128    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
129    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
130    /* TODO Both of these are probably specific to the fs wal provider */
131    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
132      ProtobufWALStreamReader.class, WALStreamReader.class);
133    Preconditions.checkArgument(
134      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
135      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
136      AbstractFSWALProvider.Initializer.class.getName());
137    this.conf = conf;
138    // end required early initialization
139
140    // this instance can't create wals, just reader/writers.
141    provider = null;
142    factoryId = SINGLETON_ID;
143    this.abortable = null;
144    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
145  }
146
147  Providers getDefaultProvider() {
148    return Providers.defaultProvider;
149  }
150
151  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
152    try {
153      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
154
155      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
156      // the default and we can't use it, we want to fall back to FSHLog which we know works on
157      // all versions.
158      if (
159        provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
160          && !AsyncFSWALProvider.load()
161      ) {
162        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
163        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
164        // easily broken when upgrading hadoop.
165        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
166        return FSHLogProvider.class;
167      }
168
169      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
170      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
171      // not fall back to FSHLogProvider.
172      return provider.clazz;
173    } catch (IllegalArgumentException exception) {
174      // Fall back to them specifying a class name
175      // Note that the passed default class shouldn't actually be used, since the above only fails
176      // when there is a config value present.
177      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
178    }
179  }
180
181  WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId)
182    throws IOException {
183    LOG.info("Instantiating WALProvider of type " + clazz);
184    try {
185      final WALProvider result = clazz.getDeclaredConstructor().newInstance();
186      result.init(this, conf, providerId, this.abortable);
187      return result;
188    } catch (Exception e) {
189      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
190      LOG.debug("Exception details for failure to load WALProvider.", e);
191      throw new IOException("couldn't set up WALProvider", e);
192    }
193  }
194
195  /**
196   * instantiate a provider from a config property. requires conf to have already been set (as well
197   * as anything the provider might need to read).
198   */
199  WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException {
200    Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
201    WALProvider provider = createProvider(clazz, providerId);
202    provider.addWALActionsListener(new MetricsWAL());
203    return provider;
204  }
205
206  public WALFactory(Configuration conf, String factoryId) throws IOException {
207    this(conf, factoryId, null);
208  }
209
210  /**
211   * @param conf      must not be null, will keep a reference to read params in later reader/writer
212   *                  instances.
213   * @param abortable the server to abort
214   */
215  public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException {
216    // until we've moved reader/writer construction down into providers, this initialization must
217    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
218    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
219    /* TODO Both of these are probably specific to the fs wal provider */
220    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
221      ProtobufWALStreamReader.class, WALStreamReader.class);
222    Preconditions.checkArgument(
223      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
224      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
225      AbstractFSWALProvider.Initializer.class.getName());
226    this.conf = conf;
227    this.factoryId = factoryId;
228    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
229    this.abortable = abortable;
230    // end required early initialization
231    if (conf.getBoolean(WAL_ENABLED, true)) {
232      provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
233    } else {
234      // special handling of existing configuration behavior.
235      LOG.warn("Running with WAL disabled.");
236      provider = new DisabledWALProvider();
237      provider.init(this, conf, factoryId, null);
238    }
239  }
240
241  /**
242   * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
243   * replay and edits that have gone to any wals from this factory.
244   */
245  public void close() throws IOException {
246    final WALProvider metaProvider = this.metaProvider.get();
247    if (null != metaProvider) {
248      metaProvider.close();
249    }
250    // close is called on a WALFactory with null provider in the case of contention handling
251    // within the getInstance method.
252    if (null != provider) {
253      provider.close();
254    }
255  }
256
257  /**
258   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
259   * are not ending cleanly and will need to replay edits from this factory's wals, use this method
260   * if you can as it will try to leave things as tidy as possible.
261   */
262  public void shutdown() throws IOException {
263    IOException exception = null;
264    final WALProvider metaProvider = this.metaProvider.get();
265    if (null != metaProvider) {
266      try {
267        metaProvider.shutdown();
268      } catch (IOException ioe) {
269        exception = ioe;
270      }
271    }
272    provider.shutdown();
273    if (null != exception) {
274      throw exception;
275    }
276  }
277
278  public List<WAL> getWALs() {
279    return provider.getWALs();
280  }
281
282  /**
283   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
284   * creating the first hbase:meta WAL so we can register a listener.
285   * @see #getMetaWALProvider()
286   */
287  public WALProvider getMetaProvider() throws IOException {
288    for (;;) {
289      WALProvider provider = this.metaProvider.get();
290      if (provider != null) {
291        return provider;
292      }
293      Class<? extends WALProvider> clz = null;
294      if (conf.get(META_WAL_PROVIDER) == null) {
295        try {
296          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
297        } catch (Throwable t) {
298          // the WAL provider should be an enum. Proceed
299        }
300      }
301      if (clz == null) {
302        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
303      }
304      provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
305      if (metaProvider.compareAndSet(null, provider)) {
306        return provider;
307      } else {
308        // someone is ahead of us, close and try again.
309        provider.close();
310      }
311    }
312  }
313
314  /**
315   * @param region the region which we want to get a WAL for. Could be null.
316   */
317  public WAL getWAL(RegionInfo region) throws IOException {
318    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
319    if (
320      region != null && region.isMetaRegion()
321        && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
322    ) {
323      return getMetaProvider().getWAL(region);
324    } else {
325      return provider.getWAL(region);
326    }
327  }
328
329  public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
330    return createStreamReader(fs, path, (CancelableProgressable) null);
331  }
332
333  /**
334   * Create a one-way stream reader for the WAL.
335   * @return A WAL reader. Close when done with it.
336   */
337  public WALStreamReader createStreamReader(FileSystem fs, Path path,
338    CancelableProgressable reporter) throws IOException {
339    return createStreamReader(fs, path, reporter, -1);
340  }
341
342  /**
343   * Create a one-way stream reader for the WAL, and start reading from the given
344   * {@code startPosition}.
345   * @return A WAL reader. Close when done with it.
346   */
347  public WALStreamReader createStreamReader(FileSystem fs, Path path,
348    CancelableProgressable reporter, long startPosition) throws IOException {
349    try {
350      // A wal file could be under recovery, so it may take several
351      // tries to get it open. Instead of claiming it is corrupted, retry
352      // to open it up to 5 minutes by default.
353      long startWaiting = EnvironmentEdgeManager.currentTime();
354      long openTimeout = timeoutMillis + startWaiting;
355      int nbAttempt = 0;
356      WALStreamReader reader = null;
357      while (true) {
358        try {
359          reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
360          ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
361          return reader;
362        } catch (Exception e) {
363          // catch Exception so that we close reader for all exceptions. If we don't
364          // close the reader, we leak a socket.
365          if (reader != null) {
366            reader.close();
367          }
368
369          // Only inspect the Exception to consider retry when it's an IOException
370          if (e instanceof IOException) {
371            String msg = e.getMessage();
372            if (
373              msg != null && (msg.contains("Cannot obtain block length")
374                || msg.contains("Could not obtain the last block")
375                || msg.matches("Blocklist for [^ ]* has changed.*"))
376            ) {
377              if (++nbAttempt == 1) {
378                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
379              }
380              if (reporter != null && !reporter.progress()) {
381                throw new InterruptedIOException("Operation is cancelled");
382              }
383              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
384                LOG.error("Can't open after " + nbAttempt + " attempts and "
385                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
386              } else {
387                try {
388                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
389                  continue; // retry
390                } catch (InterruptedException ie) {
391                  InterruptedIOException iioe = new InterruptedIOException();
392                  iioe.initCause(ie);
393                  throw iioe;
394                }
395              }
396              throw new LeaseNotRecoveredException(e);
397            }
398          }
399
400          // Rethrow the original exception if we are not retrying due to HDFS-isms.
401          throw e;
402        }
403      }
404    } catch (IOException ie) {
405      throw ie;
406    } catch (Exception e) {
407      throw new IOException("Cannot get log reader", e);
408    }
409  }
410
411  /**
412   * Create a writer for the WAL. Uses defaults.
413   * <p>
414   * Should be package-private. public only for tests and
415   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
416   * @return A WAL writer. Close when done with it.
417   */
418  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
419    return FSHLogProvider.createWriter(conf, fs, path, false);
420  }
421
422  /**
423   * Should be package-private, visible for recovery testing. Uses defaults.
424   * @return an overwritable writer for recovered edits. caller should close.
425   */
426  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
427    throws IOException {
428    return FSHLogProvider.createWriter(conf, fs, path, true);
429  }
430
431  // These static methods are currently used where it's impractical to
432  // untangle the reliance on state in the filesystem. They rely on singleton
433  // WALFactory that just provides Reader / Writers.
434  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
435  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
436  private static final String SINGLETON_ID = WALFactory.class.getName();
437
438  // Public only for FSHLog
439  public static WALFactory getInstance(Configuration configuration) {
440    WALFactory factory = singleton.get();
441    if (null == factory) {
442      WALFactory temp = new WALFactory(configuration);
443      if (singleton.compareAndSet(null, temp)) {
444        factory = temp;
445      } else {
446        // someone else beat us to initializing
447        try {
448          temp.close();
449        } catch (IOException exception) {
450          LOG.debug("failed to close temporary singleton. ignoring.", exception);
451        }
452        factory = singleton.get();
453      }
454    }
455    return factory;
456  }
457
458  /**
459   * Create a tailing reader for the given path. Mainly used in replication.
460   */
461  public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
462    long startPosition) throws IOException {
463    ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
464    reader.init(fs, path, conf, startPosition);
465    return reader;
466  }
467
468  /**
469   * Create a one-way stream reader for a given path.
470   */
471  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
472    throws IOException {
473    return createStreamReader(fs, path, conf, -1);
474  }
475
476  /**
477   * Create a one-way stream reader for a given path.
478   */
479  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
480    long startPosition) throws IOException {
481    return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
482      startPosition);
483  }
484
485  /**
486   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
487   * @return a Writer that will overwrite files. Caller must close.
488   */
489  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
490    final Configuration configuration) throws IOException {
491    return FSHLogProvider.createWriter(configuration, fs, path, true);
492  }
493
494  /**
495   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
496   * @return a writer that won't overwrite files. Caller must close.
497   */
498  public static Writer createWALWriter(final FileSystem fs, final Path path,
499    final Configuration configuration) throws IOException {
500    return FSHLogProvider.createWriter(configuration, fs, path, false);
501  }
502
503  public String getFactoryId() {
504    return factoryId;
505  }
506
507  public WALProvider getWALProvider() {
508    return this.provider;
509  }
510
511  /**
512   * @return Current metaProvider... may be null if not yet initialized.
513   * @see #getMetaProvider()
514   */
515  public WALProvider getMetaWALProvider() {
516    return this.metaProvider.get();
517  }
518
519  public ExcludeDatanodeManager getExcludeDatanodeManager() {
520    return excludeDatanodeManager;
521  }
522}