001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderType; 035import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 042import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 048import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; 049import org.apache.hadoop.hbase.wal.WALEdit; 050import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 051import org.apache.hadoop.hbase.wal.WALKeyImpl; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062import org.apache.hbase.thirdparty.io.netty.channel.Channel; 063import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 064import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 066 067/** 068 * Testcase for HBASE-25905 069 */ 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestAsyncFSWALRollStuck { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class); 078 079 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 080 081 private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(); 082 083 private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; 084 085 private static ScheduledExecutorService EXECUTOR; 086 087 private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3); 088 089 private static AtomicInteger SYNC_COUNT = new AtomicInteger(0); 090 091 private static CountDownLatch ARRIVE = new CountDownLatch(1); 092 093 private static CountDownLatch RESUME = new CountDownLatch(1); 094 095 public static final class TestAsyncWriter extends AsyncProtobufLogWriter { 096 097 public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { 098 super(eventLoopGroup, channelClass); 099 } 100 101 @Override 102 public CompletableFuture<Long> sync(boolean forceSync) { 103 int count = SYNC_COUNT.incrementAndGet(); 104 if (count < 3) { 105 // we will mark these two futures as failure, to make sure that we have 2 edits in 106 // unackedAppends, and trigger a WAL roll 107 CompletableFuture<Long> f = new CompletableFuture<>(); 108 FUTURES.offer(f); 109 return f; 110 } else if (count == 3) { 111 // for this future, we will mark it as succeeded, but before returning from this method, we 112 // need to request a roll, to enter the special corner case, where we have left some edits 113 // in unackedAppends but never tries to write them out which leads to a hang 114 ARRIVE.countDown(); 115 try { 116 RESUME.await(); 117 } catch (InterruptedException e) { 118 } 119 return super.sync(forceSync); 120 } else { 121 return super.sync(forceSync); 122 } 123 } 124 } 125 126 private static TableName TN; 127 128 private static RegionInfo RI; 129 130 private static MultiVersionConcurrencyControl MVCC; 131 132 private static AsyncFSWAL WAL; 133 134 private static ExecutorService ROLL_EXEC; 135 136 @BeforeClass 137 public static void setUp() throws Exception { 138 Configuration conf = UTIL.getConfiguration(); 139 conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class); 140 // set a very small size so we will reach the batch size when writing out a single edit 141 conf.setLong(AbstractFSWAL.WAL_BATCH_SIZE, 1); 142 143 TN = TableName.valueOf("test"); 144 RI = RegionInfoBuilder.newBuilder(TN).build(); 145 MVCC = new MultiVersionConcurrencyControl(); 146 147 EXECUTOR = 148 Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build()); 149 150 Path rootDir = UTIL.getDataTestDir(); 151 ROLL_EXEC = 152 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 153 WALActionsListener listener = new WALActionsListener() { 154 155 @Override 156 public void logRollRequested(RollRequestReason reason) { 157 ROLL_EXEC.execute(() -> { 158 try { 159 WAL.rollWriter(); 160 } catch (Exception e) { 161 LOG.warn("failed to roll writer", e); 162 } 163 }); 164 } 165 166 }; 167 WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), null, rootDir, "log", "oldlog", conf, 168 Arrays.asList(listener), true, null, null, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS, 169 StreamSlowMonitor.create(conf, "monitor")); 170 WAL.init(); 171 } 172 173 @AfterClass 174 public static void tearDown() throws Exception { 175 EXECUTOR.shutdownNow(); 176 ROLL_EXEC.shutdownNow(); 177 Closeables.close(WAL, true); 178 UTIL.cleanupTestDir(); 179 } 180 181 @Test 182 public void testRoll() throws Exception { 183 byte[] row = Bytes.toBytes("family"); 184 WALEdit edit = new WALEdit(); 185 WALEditInternalHelper.addExtendedCell(edit, 186 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row) 187 .setQualifier(row).setRow(row).setValue(row) 188 .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build()); 189 WALKeyImpl key1 = 190 new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC); 191 WAL.appendData(RI, key1, edit); 192 193 WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC); 194 long txid = WAL.appendData(RI, key2, edit); 195 196 // we need to make sure the two edits have both been added unackedAppends, so we have two syncs 197 UTIL.waitFor(10000, () -> FUTURES.size() == 2); 198 FUTURES.poll().completeExceptionally(new IOException("inject error")); 199 FUTURES.poll().completeExceptionally(new IOException("inject error")); 200 ARRIVE.await(); 201 // resume after 1 seconds, to give us enough time to enter the roll state 202 EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS); 203 // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside 204 // waitForSafePoint 205 WAL.rollWriter(); 206 // make sure we can finally succeed 207 WAL.sync(txid); 208 } 209}