001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.ChunkCreator; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALKey; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060/** 061 * Provides FSHLog test cases. 062 */ 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestFSHLog extends AbstractTestFSWAL { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class); 068 069 private static final long TEST_TIMEOUT_MS = 10000; 070 071 @Rule 072 public TestName name = new TestName(); 073 074 @Override 075 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 076 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 077 String suffix) throws IOException { 078 FSHLog wal = 079 new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 080 wal.init(); 081 return wal; 082 } 083 084 @Override 085 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 086 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 087 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 088 throws IOException { 089 FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, 090 prefix, suffix) { 091 092 @Override 093 protected void atHeadOfRingBufferEventHandlerAppend() { 094 action.run(); 095 super.atHeadOfRingBufferEventHandlerAppend(); 096 } 097 }; 098 wal.init(); 099 return wal; 100 } 101 102 @Test 103 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 104 SecurityException, IllegalArgumentException, IllegalAccessException { 105 final String name = this.name.getMethodName(); 106 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 107 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 108 log.init(); 109 try { 110 Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex"); 111 syncRunnerIndexField.setAccessible(true); 112 syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1); 113 TableDescriptor htd = 114 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 116 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 117 for (byte[] fam : htd.getColumnFamilyNames()) { 118 scopes.put(fam, 0); 119 } 120 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 121 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 122 for (int i = 0; i < 10; i++) { 123 addEdits(log, hri, htd, 1, mvcc, scopes, "row"); 124 } 125 } finally { 126 log.close(); 127 } 128 } 129 130 /** 131 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 132 */ 133 @Test 134 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 135 final String name = this.name.getMethodName(); 136 final byte[] b = Bytes.toBytes("b"); 137 138 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 139 final CountDownLatch holdAppend = new CountDownLatch(1); 140 final CountDownLatch flushFinished = new CountDownLatch(1); 141 final CountDownLatch putFinished = new CountDownLatch(1); 142 143 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 144 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 145 log.init(); 146 log.registerWALActionsListener(new WALActionsListener() { 147 @Override 148 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 149 if (startHoldingForAppend.get()) { 150 try { 151 holdAppend.await(); 152 } catch (InterruptedException e) { 153 LOG.error(e.toString(), e); 154 } 155 } 156 } 157 }); 158 159 // open a new region which uses this WAL 160 TableDescriptor htd = 161 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 162 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 163 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 164 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 165 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 166 final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); 167 ExecutorService exec = Executors.newFixedThreadPool(2); 168 169 // do a regular write first because of memstore size calculation. 170 region.put(new Put(b).addColumn(b, b, b)); 171 172 startHoldingForAppend.set(true); 173 exec.submit(new Runnable() { 174 @Override 175 public void run() { 176 try { 177 region.put(new Put(b).addColumn(b, b, b)); 178 putFinished.countDown(); 179 } catch (IOException e) { 180 LOG.error(e.toString(), e); 181 } 182 } 183 }); 184 185 // give the put a chance to start 186 Threads.sleep(3000); 187 188 exec.submit(new Runnable() { 189 @Override 190 public void run() { 191 try { 192 HRegion.FlushResult flushResult = region.flush(true); 193 LOG.info("Flush result:" + flushResult.getResult()); 194 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 195 flushFinished.countDown(); 196 } catch (IOException e) { 197 LOG.error(e.toString(), e); 198 } 199 } 200 }); 201 202 // give the flush a chance to start. Flush should have got the region lock, and 203 // should have been waiting on the mvcc complete after this. 204 Threads.sleep(3000); 205 206 // let the append to WAL go through now that the flush already started 207 holdAppend.countDown(); 208 putFinished.await(); 209 flushFinished.await(); 210 211 // check whether flush went through 212 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size()); 213 214 // now check the region's unflushed seqIds. 215 long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes()); 216 assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM, 217 seqId); 218 219 region.close(); 220 } 221 } 222}