001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.lang.reflect.Field; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.regionserver.ChunkCreator; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 050import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.FSHLogProvider; 057import org.apache.hadoop.hbase.wal.WAL; 058import org.apache.hadoop.hbase.wal.WALEdit; 059import org.apache.hadoop.hbase.wal.WALKey; 060import org.apache.hadoop.hbase.wal.WALProvider; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.TestName; 066 067/** 068 * Provides FSHLog test cases. 069 */ 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestFSHLog extends AbstractTestFSWAL { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class); 075 076 private static final long TEST_TIMEOUT_MS = 10000; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 @Override 082 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 083 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 084 String suffix) throws IOException { 085 FSHLog fshLog = 086 new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 087 fshLog.init(); 088 return fshLog; 089 } 090 091 @Override 092 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 093 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 094 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 095 throws IOException { 096 FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, 097 prefix, suffix) { 098 099 @Override 100 protected void atHeadOfRingBufferEventHandlerAppend() { 101 action.run(); 102 super.atHeadOfRingBufferEventHandlerAppend(); 103 } 104 }; 105 fshLog.init(); 106 return fshLog; 107 } 108 109 @Test 110 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 111 SecurityException, IllegalArgumentException, IllegalAccessException { 112 final String name = this.name.getMethodName(); 113 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 114 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 115 log.init(); 116 try { 117 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 118 ringBufferEventHandlerField.setAccessible(true); 119 FSHLog.RingBufferEventHandler ringBufferEventHandler = 120 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 121 Field syncRunnerIndexField = 122 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex"); 123 syncRunnerIndexField.setAccessible(true); 124 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); 125 TableDescriptor htd = 126 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 127 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 128 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 129 for (byte[] fam : htd.getColumnFamilyNames()) { 130 scopes.put(fam, 0); 131 } 132 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 133 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 134 for (int i = 0; i < 10; i++) { 135 addEdits(log, hri, htd, 1, mvcc, scopes, "row"); 136 } 137 } finally { 138 log.close(); 139 } 140 } 141 142 /** 143 * Test for WAL stall due to sync future overwrites. See HBASE-25984. 144 */ 145 @Test 146 public void testDeadlockWithSyncOverwrites() throws Exception { 147 final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1); 148 149 class FailingWriter implements WALProvider.Writer { 150 @Override 151 public void sync(boolean forceSync) throws IOException { 152 throw new IOException("Injected failure.."); 153 } 154 155 @Override 156 public void append(WAL.Entry entry) throws IOException { 157 } 158 159 @Override 160 public long getLength() { 161 return 0; 162 } 163 164 @Override 165 public long getSyncedLength() { 166 return 0; 167 } 168 169 @Override 170 public void close() throws IOException { 171 } 172 } 173 174 /* 175 * Custom FSHLog implementation with a conditional wait before attaining safe point. 176 */ 177 class CustomFSHLog extends FSHLog { 178 public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir, 179 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 180 String prefix, String suffix) throws IOException { 181 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 182 } 183 184 @Override 185 protected void beforeWaitOnSafePoint() { 186 try { 187 assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)); 188 } catch (InterruptedException e) { 189 throw new RuntimeException(e); 190 } 191 } 192 193 public SyncFuture publishSyncOnRingBuffer() { 194 long sequence = getSequenceOnRingBuffer(); 195 return publishSyncOnRingBuffer(sequence, false); 196 } 197 } 198 199 final String name = this.name.getMethodName(); 200 Configuration conf = new Configuration(CONF); 201 conf.set(FSHLogProvider.WRITER_IMPL, FailingWriter.class.getName()); 202 try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(conf), name, 203 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) { 204 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 205 ringBufferEventHandlerField.setAccessible(true); 206 FSHLog.RingBufferEventHandler ringBufferEventHandler = 207 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 208 // Force a safe point 209 FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint(); 210 try { 211 SyncFuture future0 = log.publishSyncOnRingBuffer(); 212 // Wait for the sync to be done. 213 Waiter.waitFor(conf, TEST_TIMEOUT_MS, future0::isDone); 214 // Publish another sync from the same thread, this should not overwrite the done sync. 215 SyncFuture future1 = log.publishSyncOnRingBuffer(); 216 assertFalse(future1.isDone()); 217 // Unblock the safe point trigger.. 218 blockBeforeSafePoint.countDown(); 219 // Wait for the safe point to be reached. 220 // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync 221 // pipeline. 222 Waiter.waitFor(conf, TEST_TIMEOUT_MS, latch::isSafePointAttained); 223 } finally { 224 // Force release the safe point, for the clean up. 225 latch.releaseSafePoint(); 226 } 227 } 228 } 229 230 /** 231 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 232 */ 233 @Test 234 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 235 final String name = this.name.getMethodName(); 236 final byte[] b = Bytes.toBytes("b"); 237 238 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 239 final CountDownLatch holdAppend = new CountDownLatch(1); 240 final CountDownLatch flushFinished = new CountDownLatch(1); 241 final CountDownLatch putFinished = new CountDownLatch(1); 242 243 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 244 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 245 log.init(); 246 log.registerWALActionsListener(new WALActionsListener() { 247 @Override 248 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 249 if (startHoldingForAppend.get()) { 250 try { 251 holdAppend.await(); 252 } catch (InterruptedException e) { 253 LOG.error(e.toString(), e); 254 } 255 } 256 } 257 }); 258 259 // open a new region which uses this WAL 260 TableDescriptor htd = 261 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 262 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 263 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 264 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 265 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 266 final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); 267 ExecutorService exec = Executors.newFixedThreadPool(2); 268 269 // do a regular write first because of memstore size calculation. 270 region.put(new Put(b).addColumn(b, b, b)); 271 272 startHoldingForAppend.set(true); 273 exec.submit(new Runnable() { 274 @Override 275 public void run() { 276 try { 277 region.put(new Put(b).addColumn(b, b, b)); 278 putFinished.countDown(); 279 } catch (IOException e) { 280 LOG.error(e.toString(), e); 281 } 282 } 283 }); 284 285 // give the put a chance to start 286 Threads.sleep(3000); 287 288 exec.submit(new Runnable() { 289 @Override 290 public void run() { 291 try { 292 HRegion.FlushResult flushResult = region.flush(true); 293 LOG.info("Flush result:" + flushResult.getResult()); 294 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 295 flushFinished.countDown(); 296 } catch (IOException e) { 297 LOG.error(e.toString(), e); 298 } 299 } 300 }); 301 302 // give the flush a chance to start. Flush should have got the region lock, and 303 // should have been waiting on the mvcc complete after this. 304 Threads.sleep(3000); 305 306 // let the append to WAL go through now that the flush already started 307 holdAppend.countDown(); 308 putFinished.await(); 309 flushFinished.await(); 310 311 // check whether flush went through 312 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size()); 313 314 // now check the region's unflushed seqIds. 315 long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); 316 assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM, 317 seqId); 318 319 region.close(); 320 } 321 } 322}