001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; 025import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 026import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; 027import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 028import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 029import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; 030import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 031import org.apache.hadoop.hbase.util.CommonFSUtils; 032import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 040import org.apache.hbase.thirdparty.io.netty.channel.Channel; 041import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 042import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 043import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 044import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; 045 046/** 047 * A WAL provider that use {@link AsyncFSWAL}. 048 */ 049@InterfaceAudience.Private 050@InterfaceStability.Evolving 051public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { 052 053 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class); 054 055 public static final String WRITER_IMPL = "hbase.regionserver.wal.async.writer.impl"; 056 057 // Only public so classes back in regionserver.wal can access 058 public interface AsyncWriter extends WALProvider.AsyncWriter { 059 /** 060 * @throws IOException if something goes wrong initializing an output stream 061 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that 062 * meet the needs of the given Writer implementation. 063 */ 064 void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, 065 StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; 066 } 067 068 /** 069 * Protected visibility for used in tests. 070 */ 071 protected EventLoopGroup eventLoopGroup; 072 073 /** 074 * Protected visibility for used in tests. 075 */ 076 protected Class<? extends Channel> channelClass; 077 078 @Override 079 protected AsyncFSWAL createWAL() throws IOException { 080 return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 081 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), 082 getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, 083 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 084 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 085 } 086 087 @Override 088 protected void doInit(Configuration conf) throws IOException { 089 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = 090 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); 091 if (eventLoopGroupAndChannelClass != null) { 092 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); 093 channelClass = eventLoopGroupAndChannelClass.getSecond(); 094 } else { 095 eventLoopGroup = 096 new NioEventLoopGroup(1, new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); 097 channelClass = NioSocketChannel.class; 098 } 099 } 100 101 /** 102 * Public because of AsyncFSWAL. Should be package-private 103 */ 104 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 105 boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 106 throws IOException { 107 return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), 108 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName())); 109 } 110 111 /** 112 * Public because of AsyncFSWAL. Should be package-private 113 */ 114 public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, 115 boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, 116 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException { 117 // Configuration already does caching for the Class lookup. 118 Class<? extends AsyncWriter> logWriterClass = 119 conf.getClass(WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); 120 try { 121 AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) 122 .newInstance(eventLoopGroup, channelClass); 123 writer.init(fs, path, conf, overwritable, blocksize, monitor); 124 return writer; 125 } catch (Exception e) { 126 if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { 127 LOG.error("The RegionServer async write ahead log provider " 128 + "relies on the ability to call " + e.getMessage() + " for proper operation during " 129 + "component failures, but the current FileSystem does not support doing so. Please " 130 + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " 131 + "it points to a FileSystem mount that has suitable capabilities for output streams."); 132 } else { 133 LOG.debug("Error instantiating log writer.", e); 134 } 135 Throwables.propagateIfPossible(e, IOException.class); 136 throw new IOException("cannot get log writer", e); 137 } 138 } 139 140 /** 141 * Test whether we can load the helper classes for async dfs output. 142 */ 143 public static boolean load() { 144 try { 145 Class.forName(FanOutOneBlockAsyncDFSOutput.class.getName()); 146 Class.forName(FanOutOneBlockAsyncDFSOutputHelper.class.getName()); 147 Class.forName(FanOutOneBlockAsyncDFSOutputSaslHelper.class.getName()); 148 return true; 149 } catch (Throwable e) { 150 return false; 151 } 152 } 153}