001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.OptionalLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
045import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.hadoop.hbase.wal.WAL;
053import org.apache.hadoop.hbase.wal.WALEdit;
054import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
055import org.apache.hadoop.hbase.wal.WALKeyImpl;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061
062import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
063
064/**
065 * Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader.
066 * <p/>
067 * This is used to confirm that we can work well when hitting EOFException in the middle when
068 * reading a WAL entry, when compression is enabled. See HBASE-27621 for more details.
069 */
070@Category({ ReplicationTests.class, MediumTests.class })
071public class TestWALEntryStreamCompressionReset {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestWALEntryStreamCompressionReset.class);
076
077  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
078
079  private static TableName TABLE_NAME = TableName.valueOf("reset");
080
081  private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
082
083  private static byte[] FAMILY = Bytes.toBytes("family");
084
085  private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
086
087  private static NavigableMap<byte[], Integer> SCOPE;
088
089  private static String GROUP_ID = "group";
090
091  private static FileSystem FS;
092
093  private static ReplicationSource SOURCE;
094
095  private static MetricsSource METRICS_SOURCE;
096
097  private static ReplicationSourceLogQueue LOG_QUEUE;
098
099  private static Path TEMPLATE_WAL_FILE;
100
101  private static int END_OFFSET_OF_WAL_ENTRIES;
102
103  private static Path WAL_FILE;
104
105  private static volatile long WAL_LENGTH;
106
107  private static ReplicationSourceWALReader READER;
108
109  // return the wal path, and also the end offset of last wal entry
110  private static Pair<Path, Long> generateWAL() throws Exception {
111    Path path = UTIL.getDataTestDir("wal");
112    ProtobufLogWriter writer = new ProtobufLogWriter();
113    writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null);
114    for (int i = 0; i < Byte.MAX_VALUE; i++) {
115      WALEdit edit = new WALEdit();
116      WALEditInternalHelper.addExtendedCell(edit,
117        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
118          .setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i))
119          .setValue(Bytes.toBytes("v-" + i)).build());
120      writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
121        EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit));
122    }
123
124    WALEdit edit2 = new WALEdit();
125    WALEditInternalHelper.addExtendedCell(edit2,
126      ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
127        .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier"))
128        .setValue(Bytes.toBytes("vv")).build());
129    WALEditInternalHelper.addExtendedCell(edit2,
130      ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
131        .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1"))
132        .setValue(Bytes.toBytes("vvv")).build());
133    writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
134      EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2));
135    writer.sync(false);
136    long offset = writer.getSyncedLength();
137    writer.close();
138    return Pair.newPair(path, offset);
139  }
140
141  @BeforeClass
142  public static void setUp() throws Exception {
143    Configuration conf = UTIL.getConfiguration();
144    FS = UTIL.getTestFileSystem();
145    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
146    conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
147    conf.setInt("replication.source.maxretriesmultiplier", 1);
148    FS.mkdirs(UTIL.getDataTestDir());
149    Pair<Path, Long> pair = generateWAL();
150    TEMPLATE_WAL_FILE = pair.getFirst();
151    END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue();
152    WAL_FILE = UTIL.getDataTestDir("rep_source");
153
154    METRICS_SOURCE = new MetricsSource("reset");
155    SOURCE = mock(ReplicationSource.class);
156    when(SOURCE.isPeerEnabled()).thenReturn(true);
157    when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH));
158    when(SOURCE.getServerWALsBelongTo())
159      .thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
160    when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE);
161    ReplicationSourceManager rsm = new ReplicationSourceManager(null, null, conf, null, null, null,
162      null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
163    when(SOURCE.getSourceManager()).thenReturn(rsm);
164
165    LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);
166    LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID);
167    READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID);
168  }
169
170  @AfterClass
171  public static void tearDown() throws Exception {
172    READER.setReaderRunning(false);
173    READER.join();
174    UTIL.cleanupTestDir();
175  }
176
177  private void test(byte[] content, FSDataOutputStream out) throws Exception {
178    // minus 15 so the second entry is incomplete
179    // 15 is a magic number here, we want the reader parse the first cell but not the second cell,
180    // especially not the qualifier of the second cell. The value of the second cell is 'vvv', which
181    // is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been
182    // compressed), so 15 is a proper value, of course 14 or 16 could also work here.
183    out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15);
184    out.hflush();
185    WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15;
186    READER.start();
187    List<WAL.Entry> entries = new ArrayList<>();
188    for (;;) {
189      WALEntryBatch batch = READER.poll(1000);
190      if (batch == null) {
191        break;
192      }
193      entries.addAll(batch.getWalEntries());
194    }
195    // should return all the entries except the last one
196    assertEquals(Byte.MAX_VALUE, entries.size());
197    for (int i = 0; i < Byte.MAX_VALUE; i++) {
198      WAL.Entry entry = entries.get(i);
199      assertEquals(1, entry.getEdit().size());
200      Cell cell = entry.getEdit().getCells().get(0);
201      assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
202      assertEquals(Bytes.toString(FAMILY),
203        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
204      assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(),
205        cell.getQualifierOffset(), cell.getQualifierLength()));
206      assertEquals("v-" + i,
207        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
208    }
209
210    // confirm that we can not get the last one since it is incomplete
211    assertNull(READER.poll(1000));
212    // write the last byte out
213    out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15);
214    out.hflush();
215    WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES;
216
217    // should get the last entry
218    WALEntryBatch batch = READER.poll(10000);
219    assertEquals(1, batch.getNbEntries());
220    WAL.Entry entry = batch.getWalEntries().get(0);
221    assertEquals(2, entry.getEdit().size());
222    Cell cell2 = entry.getEdit().getCells().get(0);
223    assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset()));
224    assertEquals(Bytes.toString(FAMILY),
225      Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength()));
226    assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(),
227      cell2.getQualifierLength()));
228    assertEquals("vv",
229      Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength()));
230
231    Cell cell3 = entry.getEdit().getCells().get(1);
232    assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset()));
233    assertEquals(Bytes.toString(FAMILY),
234      Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength()));
235    assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(),
236      cell3.getQualifierOffset(), cell3.getQualifierLength()));
237    assertEquals("vvv",
238      Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength()));
239  }
240
241  @Test
242  public void testReset() throws Exception {
243    byte[] content;
244    try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) {
245      content = ByteStreams.toByteArray(in);
246    }
247    try (FSDataOutputStream out = FS.create(WAL_FILE)) {
248      test(content, out);
249    }
250  }
251}