001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertNull; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.io.UncheckedIOException; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 045import org.apache.hadoop.hbase.regionserver.LogRoller; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.regionserver.RegionServerServices; 048import org.apache.hadoop.hbase.regionserver.SequenceId; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FutureUtils; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 058import org.apache.hadoop.hbase.wal.WALKey; 059import org.apache.hadoop.hbase.wal.WALKeyImpl; 060import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066 067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 068import org.apache.hbase.thirdparty.io.netty.channel.Channel; 069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 071import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 072 073/** 074 * Provides AsyncFSWAL test cases. 075 */ 076@Category({ RegionServerTests.class, LargeTests.class }) 077public class TestAsyncFSWAL extends AbstractTestFSWAL { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestAsyncFSWAL.class); 082 083 private static EventLoopGroup GROUP; 084 085 private static Class<? extends Channel> CHANNEL_CLASS; 086 087 @BeforeClass 088 public static void setUpBeforeClass() throws Exception { 089 GROUP = 090 new NioEventLoopGroup(1, new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d") 091 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 092 CHANNEL_CLASS = NioSocketChannel.class; 093 AbstractTestFSWAL.setUpBeforeClass(); 094 } 095 096 @AfterClass 097 public static void tearDownAfterClass() throws Exception { 098 AbstractTestFSWAL.tearDownAfterClass(); 099 GROUP.shutdownGracefully(); 100 } 101 102 @Override 103 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 104 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 105 String suffix) throws IOException { 106 AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners, 107 failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS, 108 StreamSlowMonitor.create(conf, "monitor")); 109 wal.init(); 110 return wal; 111 } 112 113 @Override 114 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir, 115 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 116 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 117 throws IOException { 118 AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners, 119 failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS, 120 StreamSlowMonitor.create(conf, "monitor")) { 121 122 @Override 123 protected void atHeadOfRingBufferEventHandlerAppend() { 124 action.run(); 125 super.atHeadOfRingBufferEventHandlerAppend(); 126 } 127 }; 128 wal.init(); 129 return wal; 130 } 131 132 @Test 133 public void testBrokenWriter() throws Exception { 134 RegionServerServices services = mock(RegionServerServices.class); 135 when(services.getConfiguration()).thenReturn(CONF); 136 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 137 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 138 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 139 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 140 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 141 for (byte[] fam : td.getColumnFamilyNames()) { 142 scopes.put(fam, 0); 143 } 144 long timestamp = EnvironmentEdgeManager.currentTime(); 145 String testName = currentTest.getMethodName(); 146 AtomicInteger failedCount = new AtomicInteger(0); 147 try (LogRoller roller = new LogRoller(services); 148 AsyncFSWAL wal = new AsyncFSWAL(FS, null, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 149 testName, CONF, null, true, null, null, null, null, GROUP, CHANNEL_CLASS, 150 StreamSlowMonitor.create(CONF, "monitorForSuffix")) { 151 152 @Override 153 protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException { 154 AsyncWriter writer = super.createWriterInstance(fs, path); 155 return new AsyncWriter() { 156 157 @Override 158 public void close() throws IOException { 159 writer.close(); 160 } 161 162 @Override 163 public long getLength() { 164 return writer.getLength(); 165 } 166 167 @Override 168 public long getSyncedLength() { 169 return writer.getSyncedLength(); 170 } 171 172 @Override 173 public CompletableFuture<Long> sync(boolean forceSync) { 174 CompletableFuture<Long> result = writer.sync(forceSync); 175 if (failedCount.incrementAndGet() < 1000) { 176 CompletableFuture<Long> future = new CompletableFuture<>(); 177 FutureUtils.addListener(result, 178 (r, e) -> future.completeExceptionally(new IOException("Inject Error"))); 179 return future; 180 } else { 181 return result; 182 } 183 } 184 185 @Override 186 public void append(Entry entry) { 187 writer.append(entry); 188 } 189 }; 190 } 191 }) { 192 wal.init(); 193 roller.addWAL(wal); 194 roller.start(); 195 int numThreads = 10; 196 AtomicReference<Exception> error = new AtomicReference<>(); 197 Thread[] threads = new Thread[numThreads]; 198 for (int i = 0; i < 10; i++) { 199 final int index = i; 200 threads[index] = new Thread("Write-Thread-" + index) { 201 202 @Override 203 public void run() { 204 byte[] row = Bytes.toBytes("row" + index); 205 WALEdit cols = new WALEdit(); 206 WALEditInternalHelper.addExtendedCell(cols, 207 new KeyValue(row, row, row, timestamp + index, row)); 208 WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), 209 SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, 210 HConstants.NO_NONCE, mvcc, scopes); 211 try { 212 wal.append(ri, key, cols, true); 213 } catch (IOException e) { 214 // should not happen 215 throw new UncheckedIOException(e); 216 } 217 try { 218 wal.sync(); 219 } catch (IOException e) { 220 error.set(e); 221 } 222 } 223 }; 224 } 225 for (Thread t : threads) { 226 t.start(); 227 } 228 for (Thread t : threads) { 229 t.join(); 230 } 231 assertNull(error.get()); 232 } 233 } 234}