001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertThrows;
023
024import java.io.EOFException;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.fs.FSDataInputStream;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.Cell.Type;
034import org.apache.hadoop.hbase.CellBuilderType;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.RegionInfoBuilder;
043import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055/**
056 * In this test, we write a small WAL file first, and then generate partial WAL file which length is
057 * in range [0, fileLength)(we test all the possible length in the range), to see if we can
058 * successfully get the completed entries, and also get an EOF at the end.
059 * <p/>
060 * It is very important to make sure 3 things:
061 * <ul>
062 * <li>We do not get incorrect entries. Otherwise there will be data corruption.</li>
063 * <li>We can get all the completed entries, i.e, we do not miss some data. Otherwise there will be
064 * data loss.</li>
065 * <li>We will get an EOF finally, instead of a general IOException. Otherwise the split or
066 * replication will be stuck.</li>
067 * </ul>
068 */
069@Category({ RegionServerTests.class, MediumTests.class })
070public class TestParsePartialWALFile {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestParsePartialWALFile.class);
075
076  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
077
078  private static FileSystem FS;
079
080  private static TableName TN = TableName.valueOf("test");
081  private static RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build();
082  private static byte[] ROW = Bytes.toBytes("row");
083  private static byte[] FAMILY = Bytes.toBytes("family");
084  private static byte[] QUAL = Bytes.toBytes("qualifier");
085  private static byte[] VALUE = Bytes.toBytes("value");
086
087  @BeforeClass
088  public static void setUp() throws IOException {
089    UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
090    FS = FileSystem.getLocal(UTIL.getConfiguration());
091    if (!FS.mkdirs(UTIL.getDataTestDir())) {
092      throw new IOException("can not create " + UTIL.getDataTestDir());
093    }
094  }
095
096  @AfterClass
097  public static void tearDown() {
098    UTIL.cleanupTestDir();
099  }
100
101  private Path generateBrokenWALFile(byte[] content, int length) throws IOException {
102    Path walFile = UTIL.getDataTestDir("wal-" + length);
103    try (FSDataOutputStream out = FS.create(walFile)) {
104      out.write(content, 0, length);
105    }
106    return walFile;
107  }
108
109  private void assertEntryEquals(WAL.Entry entry, int index) {
110    WALKeyImpl key = entry.getKey();
111    assertEquals(TN, key.getTableName());
112    assertArrayEquals(RI.getEncodedNameAsBytes(), key.getEncodedRegionName());
113    WALEdit edit = entry.getEdit();
114    assertEquals(1, edit.getCells().size());
115    Cell cell = edit.getCells().get(0);
116    assertArrayEquals(ROW, CellUtil.cloneRow(cell));
117    assertArrayEquals(FAMILY, CellUtil.cloneFamily(cell));
118    if (index % 2 == 0) {
119      assertEquals(Type.Put, cell.getType());
120      assertArrayEquals(QUAL, CellUtil.cloneQualifier(cell));
121      assertArrayEquals(VALUE, CellUtil.cloneValue(cell));
122    } else {
123      assertEquals(Type.DeleteFamily, cell.getType());
124    }
125  }
126
127  private void testReadEntry(Path file, int entryCount) throws IOException {
128    try (
129      WALStreamReader reader = WALFactory.createStreamReader(FS, file, UTIL.getConfiguration())) {
130      for (int i = 0; i < entryCount; i++) {
131        assertEntryEquals(reader.next(), i);
132      }
133      assertThrows(EOFException.class, () -> reader.next());
134    }
135    try (WALTailingReader reader =
136      WALFactory.createTailingReader(FS, file, UTIL.getConfiguration(), -1)) {
137      for (int i = 0; i < entryCount; i++) {
138        WALTailingReader.Result result = reader.next(-1);
139        assertEquals(WALTailingReader.State.NORMAL, result.getState());
140        assertEntryEquals(result.getEntry(), i);
141      }
142      WALTailingReader.Result result = reader.next(-1);
143      assertEquals(WALTailingReader.State.EOF_AND_RESET, result.getState());
144    }
145  }
146
147  @Test
148  public void testPartialParse() throws Exception {
149    Path walFile = UTIL.getDataTestDir("wal");
150    long headerLength;
151    List<Long> endOffsets = new ArrayList<>();
152    try (WALProvider.Writer writer =
153      WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) {
154      headerLength = writer.getLength();
155      for (int i = 0; i < 3; i++) {
156        WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, i,
157          EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
158        WALEdit edit = new WALEdit();
159        if (i % 2 == 0) {
160          edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
161            .setRow(ROW).setFamily(FAMILY).setQualifier(QUAL).setValue(VALUE).build());
162        } else {
163          edit.add(ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
164            .setType(Type.DeleteFamily).setRow(ROW).setFamily(FAMILY).build());
165        }
166        writer.append(new WAL.Entry(key, edit));
167        writer.sync(true);
168        endOffsets.add(writer.getLength());
169      }
170    }
171    long fileLength = FS.getFileStatus(walFile).getLen();
172    byte[] content = new byte[(int) fileLength];
173    try (FSDataInputStream in = FS.open(walFile)) {
174      in.readFully(content);
175    }
176    // partial header, should throw WALHeaderEOFException
177    for (int i = 0; i < headerLength; i++) {
178      Path brokenFile = generateBrokenWALFile(content, i);
179      assertThrows(WALHeaderEOFException.class,
180        () -> WALFactory.createStreamReader(FS, brokenFile, UTIL.getConfiguration()));
181      assertThrows(WALHeaderEOFException.class,
182        () -> WALFactory.createTailingReader(FS, brokenFile, UTIL.getConfiguration(), -1));
183      FS.delete(brokenFile, false);
184    }
185    // partial WAL entries, should be able to read some entries and the last one we will get an EOF
186    for (int i = 0; i <= endOffsets.size(); i++) {
187      int startOffset;
188      int endOffset;
189      if (i == 0) {
190        startOffset = (int) headerLength;
191        endOffset = endOffsets.get(i).intValue();
192      } else if (i == endOffsets.size()) {
193        startOffset = endOffsets.get(i - 1).intValue();
194        endOffset = (int) fileLength;
195      } else {
196        startOffset = endOffsets.get(i - 1).intValue();
197        endOffset = endOffsets.get(i).intValue();
198      }
199      for (int j = startOffset; j < endOffset; j++) {
200        Path brokenFile = generateBrokenWALFile(content, j);
201        testReadEntry(brokenFile, i);
202        FS.delete(brokenFile, false);
203      }
204    }
205    // partial trailer, should be able to read all the entries but get an EOF when trying read
206    // again, as we do not know it is a trailer
207    for (int i = endOffsets.get(endOffsets.size() - 1).intValue(); i < fileLength; i++) {
208      Path brokenFile = generateBrokenWALFile(content, i);
209      testReadEntry(brokenFile, endOffsets.size());
210      FS.delete(brokenFile, false);
211    }
212  }
213}