001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.Closeable;
027import java.io.IOException;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.HTableDescriptor;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
041import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.apache.hadoop.hbase.wal.WALFactory;
047import org.apache.hadoop.hbase.wal.WALKeyImpl;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.rules.TestName;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
059
060/**
061 * WAL tests that can be reused across providers.
062 */
063public abstract class AbstractTestProtobufLog<W extends Closeable> {
064  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
065
066  protected FileSystem fs;
067  protected Path dir;
068  protected WALFactory wals;
069
070  @Rule
071  public final TestName currentTest = new TestName();
072
073  @Before
074  public void setUp() throws Exception {
075    fs = TEST_UTIL.getDFSCluster().getFileSystem();
076    dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
077    wals = new WALFactory(TEST_UTIL.getConfiguration(), currentTest.getMethodName());
078  }
079
080  @After
081  public void tearDown() throws Exception {
082    wals.close();
083    FileStatus[] entries = fs.listStatus(new Path("/"));
084    for (FileStatus dir : entries) {
085      fs.delete(dir.getPath(), true);
086    }
087  }
088
089  @BeforeClass
090  public static void setUpBeforeClass() throws Exception {
091    // Make block sizes small.
092    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
093    // needed for testAppendClose()
094    // quicker heartbeat interval for faster DN death notification
095    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
096    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
097    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
098
099    // faster failover with cluster.shutdown();fs.close() idiom
100    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
101    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
102    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
103    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
104      SampleRegionWALCoprocessor.class.getName());
105    TEST_UTIL.startMiniDFSCluster(3);
106  }
107
108  @AfterClass
109  public static void tearDownAfterClass() throws Exception {
110    TEST_UTIL.shutdownMiniCluster();
111  }
112
113  /**
114   * Reads the WAL with and without WALTrailer.
115   */
116  @Test
117  public void testWALTrailer() throws IOException {
118    // make sure that the size for WALTrailer is 0, we need this assumption when reading partial
119    // WALTrailer
120    assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
121    // read With trailer.
122    doRead(true);
123    // read without trailer
124    doRead(false);
125  }
126
127  /**
128   * Appends entries in the WAL and reads it.
129   * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
130   *                    so that a trailer is appended to the WAL. Otherwise, it starts reading after
131   *                    the sync call. This means that reader is not aware of the trailer. In this
132   *                    scenario, if the reader tries to read the trailer in its next() call, it
133   *                    returns false from ProtoBufLogReader.
134   */
135  private void doRead(boolean withTrailer) throws IOException {
136    final int columnCount = 5;
137    final int recordCount = 5;
138    final TableName tableName = TableName.valueOf("tablename");
139    final byte[] row = Bytes.toBytes("row");
140    long timestamp = EnvironmentEdgeManager.currentTime();
141    Path path = new Path(dir, "tempwal");
142    // delete the log if already exists, for test only
143    fs.delete(path, true);
144    HRegionInfo hri =
145      new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
146    HTableDescriptor htd = new HTableDescriptor(tableName);
147    fs.mkdirs(dir);
148    W writer = null;
149    try {
150      // Write log in pb format.
151      writer = createWriter(path);
152      for (int i = 0; i < recordCount; ++i) {
153        WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
154          HConstants.DEFAULT_CLUSTER_ID);
155        WALEdit edit = new WALEdit();
156        for (int j = 0; j < columnCount; ++j) {
157          if (i == 0) {
158            htd.addFamily(new HColumnDescriptor("column" + j));
159          }
160          String value = i + "" + j;
161          edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
162        }
163        append(writer, new WAL.Entry(key, edit));
164      }
165      sync(writer);
166      if (withTrailer) {
167        writer.close();
168        writer = null;
169      }
170      // Now read the log using standard means.
171      try (ProtobufWALStreamReader reader =
172        (ProtobufWALStreamReader) wals.createStreamReader(fs, path)) {
173        if (withTrailer) {
174          assertNotNull(reader.trailer);
175        } else {
176          assertNull(reader.trailer);
177        }
178        for (int i = 0; i < recordCount; ++i) {
179          WAL.Entry entry = reader.next();
180          assertNotNull(entry);
181          assertEquals(columnCount, entry.getEdit().size());
182          assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
183          assertEquals(tableName, entry.getKey().getTableName());
184          int idx = 0;
185          for (Cell val : entry.getEdit().getCells()) {
186            assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
187              val.getRowLength()));
188            String value = i + "" + idx;
189            assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
190            idx++;
191          }
192        }
193        if (withTrailer) {
194          assertNotNull(reader.trailer);
195        } else {
196          assertNull(reader.trailer);
197        }
198      }
199    } finally {
200      Closeables.close(writer, true);
201    }
202  }
203
204  protected abstract W createWriter(Path path) throws IOException;
205
206  protected abstract void append(W writer, WAL.Entry entry) throws IOException;
207
208  protected abstract void sync(W writer) throws IOException;
209}