001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.fail; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Increment; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 043import org.apache.hadoop.hbase.testclassification.RegionServerTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.wal.WAL; 048import org.apache.hadoop.hbase.wal.WALEdit; 049import org.apache.hadoop.hbase.wal.WALFactory; 050import org.apache.hadoop.hbase.wal.WALStreamReader; 051import org.junit.After; 052import org.junit.AfterClass; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * Test for HBASE-17471. 068 * <p> 069 * MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in put/delete path. Other 070 * write paths like increment/append still assign mvcc in ringbuffer's consumer thread. If put and 071 * increment are used parallel. Then seqid in WAL may not increase monotonically Disorder in wals 072 * will lead to data loss. 073 * <p> 074 * This case use two thread to put and increment at the same time in a single region. Then check the 075 * seqid in WAL. If seqid is wal is not monotonically increasing, this case will fail 076 */ 077@RunWith(Parameterized.class) 078@Category({ RegionServerTests.class, SmallTests.class }) 079public class TestWALMonotonicallyIncreasingSeqId { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestWALMonotonicallyIncreasingSeqId.class); 084 085 private final Logger LOG = LoggerFactory.getLogger(getClass()); 086 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 087 private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId"); 088 private WALFactory wals; 089 private FileSystem fileSystem; 090 private Configuration walConf; 091 private HRegion region; 092 093 @Parameter 094 public String walProvider; 095 096 @Rule 097 public TestName name = new TestName(); 098 099 @Parameters(name = "{index}: wal={0}") 100 public static List<Object[]> data() { 101 return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" }); 102 } 103 104 private TableDescriptor getTableDesc(TableName tableName, byte[]... families) { 105 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 106 Arrays.stream(families) 107 .map( 108 f -> ColumnFamilyDescriptorBuilder.newBuilder(f).setMaxVersions(Integer.MAX_VALUE).build()) 109 .forEachOrdered(builder::setColumnFamily); 110 return builder.build(); 111 } 112 113 private HRegion initHRegion(TableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) 114 throws IOException { 115 Configuration conf = TEST_UTIL.getConfiguration(); 116 conf.set("hbase.wal.provider", walProvider); 117 conf.setBoolean("hbase.hregion.mvcc.preassign", false); 118 Path tableDir = CommonFSUtils.getTableDir(testDir, htd.getTableName()); 119 120 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey) 121 .setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build(); 122 fileSystem = tableDir.getFileSystem(conf); 123 final Configuration walConf = new Configuration(conf); 124 CommonFSUtils.setRootDir(walConf, tableDir); 125 this.walConf = walConf; 126 wals = new WALFactory(walConf, "log_" + replicaId); 127 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 128 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 129 HRegion region = 130 HRegion.createHRegion(info, TEST_UTIL.getDefaultRootDirPath(), conf, htd, wals.getWAL(info)); 131 return region; 132 } 133 134 CountDownLatch latch = new CountDownLatch(1); 135 136 public class PutThread extends Thread { 137 HRegion region; 138 139 public PutThread(HRegion region) { 140 this.region = region; 141 } 142 143 @Override 144 public void run() { 145 try { 146 for (int i = 0; i < 100; i++) { 147 byte[] row = Bytes.toBytes("putRow" + i); 148 Put put = new Put(row); 149 put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes("")); 150 latch.await(); 151 region.batchMutate(new Mutation[] { put }); 152 Thread.sleep(10); 153 } 154 155 } catch (Throwable t) { 156 LOG.warn("Error happend when Increment: ", t); 157 } 158 } 159 } 160 161 public class IncThread extends Thread { 162 HRegion region; 163 164 public IncThread(HRegion region) { 165 this.region = region; 166 } 167 168 @Override 169 public void run() { 170 try { 171 for (int i = 0; i < 100; i++) { 172 byte[] row = Bytes.toBytes("incrementRow" + i); 173 Increment inc = new Increment(row); 174 inc.addColumn("cf".getBytes(), Bytes.toBytes(0), 1); 175 // inc.setDurability(Durability.ASYNC_WAL); 176 region.increment(inc); 177 latch.countDown(); 178 Thread.sleep(10); 179 } 180 181 } catch (Throwable t) { 182 LOG.warn("Error happend when Put: ", t); 183 } 184 } 185 } 186 187 @Before 188 public void setUp() throws IOException { 189 byte[][] families = new byte[][] { Bytes.toBytes("cf") }; 190 TableDescriptor htd = getTableDesc( 191 TableName.valueOf(name.getMethodName().replaceAll("[^0-9A-Za-z_]", "_")), families); 192 region = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0); 193 } 194 195 @After 196 public void tearDown() throws IOException { 197 if (region != null) { 198 region.close(); 199 } 200 } 201 202 @AfterClass 203 public static void tearDownAfterClass() throws IOException { 204 TEST_UTIL.cleanupTestDir(); 205 } 206 207 private WALStreamReader createReader(Path logPath, Path oldWalsDir) throws IOException { 208 try { 209 return wals.createStreamReader(fileSystem, logPath); 210 } catch (IOException e) { 211 return wals.createStreamReader(fileSystem, new Path(oldWalsDir, logPath.getName())); 212 } 213 } 214 215 @Test 216 public void testWALMonotonicallyIncreasingSeqId() throws Exception { 217 List<Thread> putThreads = new ArrayList<>(); 218 for (int i = 0; i < 1; i++) { 219 putThreads.add(new PutThread(region)); 220 } 221 IncThread incThread = new IncThread(region); 222 for (int i = 0; i < 1; i++) { 223 putThreads.get(i).start(); 224 } 225 incThread.start(); 226 incThread.join(); 227 228 Path logPath = ((AbstractFSWAL<?>) region.getWAL()).getCurrentFileName(); 229 region.getWAL().rollWriter(); 230 Thread.sleep(10); 231 Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR)); 232 Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); 233 try (WALStreamReader reader = createReader(logPath, oldWalsDir)) { 234 long currentMaxSeqid = 0; 235 for (WAL.Entry e; (e = reader.next()) != null;) { 236 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 237 long currentSeqid = e.getKey().getSequenceId(); 238 if (currentSeqid > currentMaxSeqid) { 239 currentMaxSeqid = currentSeqid; 240 } else { 241 fail("Current max Seqid is " + currentMaxSeqid 242 + ", but the next seqid in wal is smaller:" + currentSeqid); 243 } 244 } 245 } 246 } 247 } 248}