001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALFactory;
053import org.apache.hadoop.hdfs.MiniDFSCluster;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.junit.runners.Parameterized.Parameter;
066import org.junit.runners.Parameterized.Parameters;
067
068/**
069 * Tests for WAL write durability
070 */
071@RunWith(Parameterized.class)
072@Category({ RegionServerTests.class, MediumTests.class })
073public class TestDurability {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestDurability.class);
078
079  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
080  private static FileSystem FS;
081  private static MiniDFSCluster CLUSTER;
082  private static Configuration CONF;
083  private static Path DIR;
084
085  private static byte[] FAMILY = Bytes.toBytes("family");
086  private static byte[] ROW = Bytes.toBytes("row");
087  private static byte[] COL = Bytes.toBytes("col");
088
089  @Parameter
090  public String walProvider;
091
092  @Rule
093  public TestName name = new TestName();
094
095  @Parameters(name = "{index}: provider={0}")
096  public static Iterable<Object[]> data() {
097    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
098  }
099
100  @BeforeClass
101  public static void setUpBeforeClass() throws Exception {
102    CONF = TEST_UTIL.getConfiguration();
103    TEST_UTIL.startMiniDFSCluster(1);
104
105    CLUSTER = TEST_UTIL.getDFSCluster();
106    FS = CLUSTER.getFileSystem();
107    DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
108    CommonFSUtils.setRootDir(CONF, DIR);
109  }
110
111  @AfterClass
112  public static void tearDownAfterClass() throws Exception {
113    TEST_UTIL.shutdownMiniCluster();
114  }
115
116  @Before
117  public void setUp() {
118    CONF.set(WALFactory.WAL_PROVIDER, walProvider);
119  }
120
121  @After
122  public void tearDown() throws IOException {
123    FS.delete(DIR, true);
124  }
125
126  @Test
127  public void testDurability() throws Exception {
128    WALFactory wals = new WALFactory(CONF,
129      ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString());
130    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
131    WAL wal = region.getWAL();
132    HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(),
133      "deferredRegion", wal, Durability.ASYNC_WAL);
134
135    region.put(newPut(null));
136    verifyWALCount(wals, wal, 1);
137
138    // a put through the deferred table does not write to the wal immediately,
139    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
140    // AsyncFlusher thread
141    deferredRegion.put(newPut(null));
142    // but will after we sync the wal
143    wal.sync();
144    verifyWALCount(wals, wal, 2);
145
146    // a put through a deferred table will be sync with the put sync'ed put
147    deferredRegion.put(newPut(null));
148    wal.sync();
149    verifyWALCount(wals, wal, 3);
150    region.put(newPut(null));
151    verifyWALCount(wals, wal, 4);
152
153    // a put through a deferred table will be sync with the put sync'ed put
154    deferredRegion.put(newPut(Durability.USE_DEFAULT));
155    wal.sync();
156    verifyWALCount(wals, wal, 5);
157    region.put(newPut(Durability.USE_DEFAULT));
158    verifyWALCount(wals, wal, 6);
159
160    // SKIP_WAL never writes to the wal
161    region.put(newPut(Durability.SKIP_WAL));
162    deferredRegion.put(newPut(Durability.SKIP_WAL));
163    verifyWALCount(wals, wal, 6);
164    wal.sync();
165    verifyWALCount(wals, wal, 6);
166
167    // Async overrides sync table default
168    region.put(newPut(Durability.ASYNC_WAL));
169    deferredRegion.put(newPut(Durability.ASYNC_WAL));
170    wal.sync();
171    verifyWALCount(wals, wal, 8);
172
173    // sync overrides async table default
174    region.put(newPut(Durability.SYNC_WAL));
175    deferredRegion.put(newPut(Durability.SYNC_WAL));
176    verifyWALCount(wals, wal, 10);
177
178    // fsync behaves like sync
179    region.put(newPut(Durability.FSYNC_WAL));
180    deferredRegion.put(newPut(Durability.FSYNC_WAL));
181    verifyWALCount(wals, wal, 12);
182  }
183
184  @Test
185  public void testIncrement() throws Exception {
186    byte[] row1 = Bytes.toBytes("row1");
187    byte[] col1 = Bytes.toBytes("col1");
188    byte[] col2 = Bytes.toBytes("col2");
189    byte[] col3 = Bytes.toBytes("col3");
190
191    // Setting up region
192    WALFactory wals = new WALFactory(CONF,
193      ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString());
194    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
195    WAL wal = region.getWAL();
196
197    // col1: amount = 0, 1 write back to WAL
198    Increment inc1 = new Increment(row1);
199    inc1.addColumn(FAMILY, col1, 0);
200    Result res = region.increment(inc1);
201    assertEquals(1, res.size());
202    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1)));
203    verifyWALCount(wals, wal, 1);
204
205    // col1: amount = 1, 1 write back to WAL
206    inc1 = new Increment(row1);
207    inc1.addColumn(FAMILY, col1, 1);
208    res = region.increment(inc1);
209    assertEquals(1, res.size());
210    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
211    verifyWALCount(wals, wal, 2);
212
213    // col1: amount = 0, 1 write back to WAL
214    inc1 = new Increment(row1);
215    inc1.addColumn(FAMILY, col1, 0);
216    res = region.increment(inc1);
217    assertEquals(1, res.size());
218    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
219    verifyWALCount(wals, wal, 3);
220    // col1: amount = 0, col2: amount = 0, col3: amount = 0
221    // 1 write back to WAL
222    inc1 = new Increment(row1);
223    inc1.addColumn(FAMILY, col1, 0);
224    inc1.addColumn(FAMILY, col2, 0);
225    inc1.addColumn(FAMILY, col3, 0);
226    res = region.increment(inc1);
227    assertEquals(3, res.size());
228    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
229    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
230    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
231    verifyWALCount(wals, wal, 4);
232
233    // col1: amount = 5, col2: amount = 4, col3: amount = 3
234    // 1 write back to WAL
235    inc1 = new Increment(row1);
236    inc1.addColumn(FAMILY, col1, 5);
237    inc1.addColumn(FAMILY, col2, 4);
238    inc1.addColumn(FAMILY, col3, 3);
239    res = region.increment(inc1);
240    assertEquals(3, res.size());
241    assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
242    assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
243    assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
244    verifyWALCount(wals, wal, 5);
245  }
246
247  /**
248   * Test when returnResults set to false in increment it should not return the result instead it
249   * resturn null.
250   */
251  @Test
252  public void testIncrementWithReturnResultsSetToFalse() throws Exception {
253    byte[] row1 = Bytes.toBytes("row1");
254    byte[] col1 = Bytes.toBytes("col1");
255
256    // Setting up region
257    WALFactory wals =
258      new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010,
259        EnvironmentEdgeManager.currentTime()).toString());
260    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
261
262    Increment inc1 = new Increment(row1);
263    inc1.setReturnResults(false);
264    inc1.addColumn(FAMILY, col1, 1);
265    Result res = region.increment(inc1);
266    assertTrue(res.isEmpty());
267  }
268
269  private Put newPut(Durability durability) {
270    Put p = new Put(ROW);
271    p.addColumn(FAMILY, COL, COL);
272    if (durability != null) {
273      p.setDurability(durability);
274    }
275    return p;
276  }
277
278  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
279    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
280    assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath));
281  }
282
283  // lifted from TestAtomicOperation
284  private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException {
285    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_"));
286    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
287      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
288    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
289    Path path = new Path(DIR, tableName.getNameAsString());
290    if (FS.exists(path)) {
291      if (!FS.delete(path, true)) {
292        throw new IOException("Failed delete of " + path);
293      }
294    }
295    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
296      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
297    return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info));
298  }
299
300  private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal,
301    Durability durability) throws IOException {
302    Path path = new Path(DIR, dir);
303    if (FS.exists(path)) {
304      if (!FS.delete(path, true)) {
305        throw new IOException("Failed delete of " + path);
306      }
307    }
308    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
309      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
310    return HRegion.createHRegion(info, path, CONF, td, wal);
311  }
312}