001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; 021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 037import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 038import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 039import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 040import org.apache.hadoop.hbase.util.CommonFSUtils; 041import org.apache.hadoop.hbase.wal.WAL.Entry; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal 048 * interactions with HDFS. 049 * <p> 050 * This implementation picks a directory in HDFS based on the same mechanisms as the 051 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the 052 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma 053 * separated list of allowed operations: 054 * <ul> 055 * <li><em>append</em> : edits will be written to the underlying filesystem</li> 056 * <li><em>sync</em> : wal syncs will result in hflush calls</li> 057 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying 058 * filesystem.</li> 059 * </ul> 060 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults 061 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is 062 * also undefined if values not listed above are included. 063 * <p> 064 * Only those operations listed will occur between the returned WAL and HDFS. All others will be 065 * no-ops. 066 * <p> 067 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned 068 * WAL will just keep writing to the same file. This won't avoid all costs associated with file 069 * management over time, becaue the data set size may result in additional HDFS block allocations. 070 */ 071@InterfaceAudience.Private 072public class IOTestProvider implements WALProvider { 073 private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class); 074 075 private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations"; 076 077 private enum AllowedOperations { 078 all, 079 append, 080 sync, 081 fileroll, 082 none 083 } 084 085 private WALFactory factory; 086 087 private Configuration conf; 088 089 private volatile FSHLog log; 090 091 private String providerId; 092 protected AtomicBoolean initialized = new AtomicBoolean(false); 093 094 private List<WALActionsListener> listeners = new ArrayList<>(); 095 096 /** 097 * @param factory factory that made us, identity used for FS layout. may not be null 098 * @param conf may not be null 099 * @param providerId differentiate between providers from one facotry, used for FS layout. may be 100 * null 101 */ 102 @Override 103 public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) 104 throws IOException { 105 if (!initialized.compareAndSet(false, true)) { 106 throw new IllegalStateException("WALProvider.init should only be called once."); 107 } 108 this.factory = factory; 109 this.conf = conf; 110 this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; 111 } 112 113 @Override 114 public List<WAL> getWALs() { 115 return Collections.singletonList(log); 116 } 117 118 private FSHLog createWAL() throws IOException { 119 String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; 120 return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), 121 AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), 122 HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, 123 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); 124 } 125 126 @Override 127 public WAL getWAL(RegionInfo region) throws IOException { 128 FSHLog log = this.log; 129 if (log != null) { 130 return log; 131 } 132 synchronized (this) { 133 log = this.log; 134 if (log == null) { 135 log = createWAL(); 136 this.log = log; 137 } 138 } 139 return log; 140 } 141 142 @Override 143 public void close() throws IOException { 144 FSHLog log = this.log; 145 if (log != null) { 146 log.close(); 147 } 148 } 149 150 @Override 151 public void shutdown() throws IOException { 152 FSHLog log = this.log; 153 if (log != null) { 154 log.shutdown(); 155 } 156 } 157 158 private static class IOTestWAL extends FSHLog { 159 160 private final boolean doFileRolls; 161 162 // Used to differntiate between roll calls before and after we finish construction. 163 private final boolean initialized; 164 165 /** 166 * Create an edit log at the given <code>dir</code> location. You should never have to load an 167 * existing log. If there is a log at startup, it should have already been processed and deleted 168 * by the time the WAL object is started up. 169 * @param fs filesystem handle 170 * @param rootDir path to where logs and oldlogs 171 * @param logDir dir where wals are stored 172 * @param archiveDir dir where wals are archived 173 * @param conf configuration to use 174 * @param listeners Listeners on WAL events. Listeners passed here will be registered 175 * before we do anything else; e.g. the Constructor 176 * {@link #rollWriter()}. 177 * @param failIfWALExists If true IOException will be thrown if files related to this wal 178 * already exist. 179 * @param prefix should always be hostname and port in distributed env and it will be 180 * URL encoded before being used. If prefix is null, "wal" will be used 181 * @param suffix will be url encoded. null is treated as empty. non-empty must start 182 * with {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} 183 */ 184 public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir, 185 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 186 final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { 187 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 188 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 189 doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) 190 || operations.contains(AllowedOperations.fileroll.name()); 191 initialized = true; 192 LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled")); 193 } 194 195 private Writer noRollsWriter; 196 197 // creatWriterInstance is where the new pipeline is set up for doing file rolls 198 // if we are skipping it, just keep returning the same writer. 199 @Override 200 protected Writer createWriterInstance(FileSystem fs, final Path path) throws IOException { 201 // we get called from the FSHLog constructor (!); always roll in this case since 202 // we don't know yet if we're supposed to generally roll and 203 // we need an initial file in the case of doing appends but no rolls. 204 if (!initialized || doFileRolls) { 205 LOG.info("creating new writer instance."); 206 final ProtobufLogWriter writer = new IOTestWriter(); 207 try { 208 writer.init(fs, path, conf, false, this.blocksize, 209 StreamSlowMonitor.create(conf, path.getName())); 210 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 211 throw new IOException("Can't create writer instance because underlying FileSystem " 212 + "doesn't support needed stream capabilities.", exception); 213 } 214 if (!initialized) { 215 LOG.info("storing initial writer instance in case file rolling isn't allowed."); 216 noRollsWriter = writer; 217 } 218 return writer; 219 } else { 220 LOG.info("WAL rolling disabled, returning the first writer."); 221 // Initial assignment happens during the constructor call, so there ought not be 222 // a race for first assignment. 223 return noRollsWriter; 224 } 225 } 226 } 227 228 /** 229 * Presumes init will be called by a single thread prior to any access of other methods. 230 */ 231 private static class IOTestWriter extends ProtobufLogWriter { 232 private boolean doAppends; 233 private boolean doSyncs; 234 235 @Override 236 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 237 long blocksize, StreamSlowMonitor monitor) 238 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 239 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); 240 if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { 241 doAppends = doSyncs = true; 242 } else if (operations.contains(AllowedOperations.none.name())) { 243 doAppends = doSyncs = false; 244 } else { 245 doAppends = operations.contains(AllowedOperations.append.name()); 246 doSyncs = operations.contains(AllowedOperations.sync.name()); 247 } 248 LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") 249 + " and syncs " + (doSyncs ? "enabled" : "disabled")); 250 super.init(fs, path, conf, overwritable, blocksize, monitor); 251 } 252 253 @Override 254 public void append(Entry entry) throws IOException { 255 if (doAppends) { 256 super.append(entry); 257 } 258 } 259 260 @Override 261 public void sync(boolean forceSync) throws IOException { 262 if (doSyncs) { 263 super.sync(forceSync); 264 } 265 } 266 } 267 268 @Override 269 public long getNumLogFiles() { 270 return this.log.getNumLogFiles(); 271 } 272 273 @Override 274 public long getLogFileSize() { 275 return this.log.getLogFileSize(); 276 } 277 278 @Override 279 public void addWALActionsListener(WALActionsListener listener) { 280 // TODO Implement WALProvider.addWALActionLister 281 282 } 283}