001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Durability; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.ChunkCreator; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALFactory; 053import org.apache.hadoop.hdfs.MiniDFSCluster; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065import org.junit.runners.Parameterized.Parameter; 066import org.junit.runners.Parameterized.Parameters; 067 068/** 069 * Tests for WAL write durability 070 */ 071@RunWith(Parameterized.class) 072@Category({ RegionServerTests.class, MediumTests.class }) 073public class TestDurability { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestDurability.class); 078 079 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 private static FileSystem FS; 081 private static MiniDFSCluster CLUSTER; 082 private static Configuration CONF; 083 private static Path DIR; 084 085 private static byte[] FAMILY = Bytes.toBytes("family"); 086 private static byte[] ROW = Bytes.toBytes("row"); 087 private static byte[] COL = Bytes.toBytes("col"); 088 089 @Parameter 090 public String walProvider; 091 092 @Rule 093 public TestName name = new TestName(); 094 095 @Parameters(name = "{index}: provider={0}") 096 public static Iterable<Object[]> data() { 097 return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); 098 } 099 100 @BeforeClass 101 public static void setUpBeforeClass() throws Exception { 102 CONF = TEST_UTIL.getConfiguration(); 103 TEST_UTIL.startMiniDFSCluster(1); 104 105 CLUSTER = TEST_UTIL.getDFSCluster(); 106 FS = CLUSTER.getFileSystem(); 107 DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability"); 108 CommonFSUtils.setRootDir(CONF, DIR); 109 } 110 111 @AfterClass 112 public static void tearDownAfterClass() throws Exception { 113 TEST_UTIL.shutdownMiniCluster(); 114 } 115 116 @Before 117 public void setUp() { 118 CONF.set(WALFactory.WAL_PROVIDER, walProvider); 119 } 120 121 @After 122 public void tearDown() throws IOException { 123 FS.delete(DIR, true); 124 } 125 126 @Test 127 public void testDurability() throws Exception { 128 WALFactory wals = new WALFactory(CONF, 129 ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString()); 130 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 131 WAL wal = region.getWAL(); 132 HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(), 133 "deferredRegion", wal, Durability.ASYNC_WAL); 134 135 region.put(newPut(null)); 136 verifyWALCount(wals, wal, 1); 137 138 // a put through the deferred table does not write to the wal immediately, 139 // but maybe has been successfully sync-ed by the underlying AsyncWriter + 140 // AsyncFlusher thread 141 deferredRegion.put(newPut(null)); 142 // but will after we sync the wal 143 wal.sync(); 144 verifyWALCount(wals, wal, 2); 145 146 // a put through a deferred table will be sync with the put sync'ed put 147 deferredRegion.put(newPut(null)); 148 wal.sync(); 149 verifyWALCount(wals, wal, 3); 150 region.put(newPut(null)); 151 verifyWALCount(wals, wal, 4); 152 153 // a put through a deferred table will be sync with the put sync'ed put 154 deferredRegion.put(newPut(Durability.USE_DEFAULT)); 155 wal.sync(); 156 verifyWALCount(wals, wal, 5); 157 region.put(newPut(Durability.USE_DEFAULT)); 158 verifyWALCount(wals, wal, 6); 159 160 // SKIP_WAL never writes to the wal 161 region.put(newPut(Durability.SKIP_WAL)); 162 deferredRegion.put(newPut(Durability.SKIP_WAL)); 163 verifyWALCount(wals, wal, 6); 164 wal.sync(); 165 verifyWALCount(wals, wal, 6); 166 167 // Async overrides sync table default 168 region.put(newPut(Durability.ASYNC_WAL)); 169 deferredRegion.put(newPut(Durability.ASYNC_WAL)); 170 wal.sync(); 171 verifyWALCount(wals, wal, 8); 172 173 // sync overrides async table default 174 region.put(newPut(Durability.SYNC_WAL)); 175 deferredRegion.put(newPut(Durability.SYNC_WAL)); 176 verifyWALCount(wals, wal, 10); 177 178 // fsync behaves like sync 179 region.put(newPut(Durability.FSYNC_WAL)); 180 deferredRegion.put(newPut(Durability.FSYNC_WAL)); 181 verifyWALCount(wals, wal, 12); 182 } 183 184 @Test 185 public void testIncrement() throws Exception { 186 byte[] row1 = Bytes.toBytes("row1"); 187 byte[] col1 = Bytes.toBytes("col1"); 188 byte[] col2 = Bytes.toBytes("col2"); 189 byte[] col3 = Bytes.toBytes("col3"); 190 191 // Setting up region 192 WALFactory wals = new WALFactory(CONF, 193 ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString()); 194 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 195 WAL wal = region.getWAL(); 196 197 // col1: amount = 0, 1 write back to WAL 198 Increment inc1 = new Increment(row1); 199 inc1.addColumn(FAMILY, col1, 0); 200 Result res = region.increment(inc1); 201 assertEquals(1, res.size()); 202 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1))); 203 verifyWALCount(wals, wal, 1); 204 205 // col1: amount = 1, 1 write back to WAL 206 inc1 = new Increment(row1); 207 inc1.addColumn(FAMILY, col1, 1); 208 res = region.increment(inc1); 209 assertEquals(1, res.size()); 210 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 211 verifyWALCount(wals, wal, 2); 212 213 // col1: amount = 0, 1 write back to WAL 214 inc1 = new Increment(row1); 215 inc1.addColumn(FAMILY, col1, 0); 216 res = region.increment(inc1); 217 assertEquals(1, res.size()); 218 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 219 verifyWALCount(wals, wal, 3); 220 // col1: amount = 0, col2: amount = 0, col3: amount = 0 221 // 1 write back to WAL 222 inc1 = new Increment(row1); 223 inc1.addColumn(FAMILY, col1, 0); 224 inc1.addColumn(FAMILY, col2, 0); 225 inc1.addColumn(FAMILY, col3, 0); 226 res = region.increment(inc1); 227 assertEquals(3, res.size()); 228 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 229 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2))); 230 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3))); 231 verifyWALCount(wals, wal, 4); 232 233 // col1: amount = 5, col2: amount = 4, col3: amount = 3 234 // 1 write back to WAL 235 inc1 = new Increment(row1); 236 inc1.addColumn(FAMILY, col1, 5); 237 inc1.addColumn(FAMILY, col2, 4); 238 inc1.addColumn(FAMILY, col3, 3); 239 res = region.increment(inc1); 240 assertEquals(3, res.size()); 241 assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1))); 242 assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2))); 243 assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3))); 244 verifyWALCount(wals, wal, 5); 245 } 246 247 /** 248 * Test when returnResults set to false in increment it should not return the result instead it 249 * resturn null. 250 */ 251 @Test 252 public void testIncrementWithReturnResultsSetToFalse() throws Exception { 253 byte[] row1 = Bytes.toBytes("row1"); 254 byte[] col1 = Bytes.toBytes("col1"); 255 256 // Setting up region 257 WALFactory wals = 258 new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010, 259 EnvironmentEdgeManager.currentTime()).toString()); 260 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 261 262 Increment inc1 = new Increment(row1); 263 inc1.setReturnResults(false); 264 inc1.addColumn(FAMILY, col1, 1); 265 Result res = region.increment(inc1); 266 assertTrue(res.isEmpty()); 267 } 268 269 private Put newPut(Durability durability) { 270 Put p = new Put(ROW); 271 p.addColumn(FAMILY, COL, COL); 272 if (durability != null) { 273 p.setDurability(durability); 274 } 275 return p; 276 } 277 278 private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { 279 Path walPath = AbstractFSWALProvider.getCurrentFileName(log); 280 assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath)); 281 } 282 283 // lifted from TestAtomicOperation 284 private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException { 285 TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_")); 286 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 287 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 288 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 289 Path path = new Path(DIR, tableName.getNameAsString()); 290 if (FS.exists(path)) { 291 if (!FS.delete(path, true)) { 292 throw new IOException("Failed delete of " + path); 293 } 294 } 295 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 296 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 297 return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info)); 298 } 299 300 private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal, 301 Durability durability) throws IOException { 302 Path path = new Path(DIR, dir); 303 if (FS.exists(path)) { 304 if (!FS.delete(path, true)) { 305 throw new IOException("Failed delete of " + path); 306 } 307 } 308 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 309 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 310 return HRegion.createHRegion(info, path, CONF, td, wal); 311 } 312}