001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertNull;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
045import org.apache.hadoop.hbase.regionserver.LogRoller;
046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
047import org.apache.hadoop.hbase.regionserver.RegionServerServices;
048import org.apache.hadoop.hbase.regionserver.SequenceId;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.FutureUtils;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
058import org.apache.hadoop.hbase.wal.WALKey;
059import org.apache.hadoop.hbase.wal.WALKeyImpl;
060import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066
067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
068import org.apache.hbase.thirdparty.io.netty.channel.Channel;
069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
070import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
071import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
072
073/**
074 * Provides AsyncFSWAL test cases.
075 */
076@Category({ RegionServerTests.class, LargeTests.class })
077public class TestAsyncFSWAL extends AbstractTestFSWAL {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestAsyncFSWAL.class);
082
083  private static EventLoopGroup GROUP;
084
085  private static Class<? extends Channel> CHANNEL_CLASS;
086
087  @BeforeClass
088  public static void setUpBeforeClass() throws Exception {
089    GROUP =
090      new NioEventLoopGroup(1, new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d")
091        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
092    CHANNEL_CLASS = NioSocketChannel.class;
093    AbstractTestFSWAL.setUpBeforeClass();
094  }
095
096  @AfterClass
097  public static void tearDownAfterClass() throws Exception {
098    AbstractTestFSWAL.tearDownAfterClass();
099    GROUP.shutdownGracefully();
100  }
101
102  @Override
103  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
104    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
105    String suffix) throws IOException {
106    AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners,
107      failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS,
108      StreamSlowMonitor.create(conf, "monitor"));
109    wal.init();
110    return wal;
111  }
112
113  @Override
114  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
115    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
116    boolean failIfWALExists, String prefix, String suffix, final Runnable action)
117    throws IOException {
118    AsyncFSWAL wal = new AsyncFSWAL(fs, null, rootDir, logDir, archiveDir, conf, listeners,
119      failIfWALExists, prefix, suffix, null, null, GROUP, CHANNEL_CLASS,
120      StreamSlowMonitor.create(conf, "monitor")) {
121
122      @Override
123      protected void atHeadOfRingBufferEventHandlerAppend() {
124        action.run();
125        super.atHeadOfRingBufferEventHandlerAppend();
126      }
127    };
128    wal.init();
129    return wal;
130  }
131
132  @Test
133  public void testBrokenWriter() throws Exception {
134    RegionServerServices services = mock(RegionServerServices.class);
135    when(services.getConfiguration()).thenReturn(CONF);
136    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
137      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
138    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
139    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
140    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
141    for (byte[] fam : td.getColumnFamilyNames()) {
142      scopes.put(fam, 0);
143    }
144    long timestamp = EnvironmentEdgeManager.currentTime();
145    String testName = currentTest.getMethodName();
146    AtomicInteger failedCount = new AtomicInteger(0);
147    try (LogRoller roller = new LogRoller(services);
148      AsyncFSWAL wal = new AsyncFSWAL(FS, null, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
149        testName, CONF, null, true, null, null, null, null, GROUP, CHANNEL_CLASS,
150        StreamSlowMonitor.create(CONF, "monitorForSuffix")) {
151
152        @Override
153        protected AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException {
154          AsyncWriter writer = super.createWriterInstance(fs, path);
155          return new AsyncWriter() {
156
157            @Override
158            public void close() throws IOException {
159              writer.close();
160            }
161
162            @Override
163            public long getLength() {
164              return writer.getLength();
165            }
166
167            @Override
168            public long getSyncedLength() {
169              return writer.getSyncedLength();
170            }
171
172            @Override
173            public CompletableFuture<Long> sync(boolean forceSync) {
174              CompletableFuture<Long> result = writer.sync(forceSync);
175              if (failedCount.incrementAndGet() < 1000) {
176                CompletableFuture<Long> future = new CompletableFuture<>();
177                FutureUtils.addListener(result,
178                  (r, e) -> future.completeExceptionally(new IOException("Inject Error")));
179                return future;
180              } else {
181                return result;
182              }
183            }
184
185            @Override
186            public void append(Entry entry) {
187              writer.append(entry);
188            }
189          };
190        }
191      }) {
192      wal.init();
193      roller.addWAL(wal);
194      roller.start();
195      int numThreads = 10;
196      AtomicReference<Exception> error = new AtomicReference<>();
197      Thread[] threads = new Thread[numThreads];
198      for (int i = 0; i < 10; i++) {
199        final int index = i;
200        threads[index] = new Thread("Write-Thread-" + index) {
201
202          @Override
203          public void run() {
204            byte[] row = Bytes.toBytes("row" + index);
205            WALEdit cols = new WALEdit();
206            WALEditInternalHelper.addExtendedCell(cols,
207              new KeyValue(row, row, row, timestamp + index, row));
208            WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(),
209              SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
210              HConstants.NO_NONCE, mvcc, scopes);
211            try {
212              wal.append(ri, key, cols, true);
213            } catch (IOException e) {
214              // should not happen
215              throw new UncheckedIOException(e);
216            }
217            try {
218              wal.sync();
219            } catch (IOException e) {
220              error.set(e);
221            }
222          }
223        };
224      }
225      for (Thread t : threads) {
226        t.start();
227      }
228      for (Thread t : threads) {
229        t.join();
230      }
231      assertNull(error.get());
232    }
233  }
234}