001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.util.ArrayList; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.OptionalLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 045import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.ReplicationTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.CommonFSUtils; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 055import org.apache.hadoop.hbase.wal.WALKeyImpl; 056import org.junit.AfterClass; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061 062import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 063 064/** 065 * Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader. 066 * <p/> 067 * This is used to confirm that we can work well when hitting EOFException in the middle when 068 * reading a WAL entry, when compression is enabled. See HBASE-27621 for more details. 069 */ 070@Category({ ReplicationTests.class, MediumTests.class }) 071public class TestWALEntryStreamCompressionReset { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestWALEntryStreamCompressionReset.class); 076 077 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 078 079 private static TableName TABLE_NAME = TableName.valueOf("reset"); 080 081 private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 082 083 private static byte[] FAMILY = Bytes.toBytes("family"); 084 085 private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); 086 087 private static NavigableMap<byte[], Integer> SCOPE; 088 089 private static String GROUP_ID = "group"; 090 091 private static FileSystem FS; 092 093 private static ReplicationSource SOURCE; 094 095 private static MetricsSource METRICS_SOURCE; 096 097 private static ReplicationSourceLogQueue LOG_QUEUE; 098 099 private static Path TEMPLATE_WAL_FILE; 100 101 private static int END_OFFSET_OF_WAL_ENTRIES; 102 103 private static Path WAL_FILE; 104 105 private static volatile long WAL_LENGTH; 106 107 private static ReplicationSourceWALReader READER; 108 109 // return the wal path, and also the end offset of last wal entry 110 private static Pair<Path, Long> generateWAL() throws Exception { 111 Path path = UTIL.getDataTestDir("wal"); 112 ProtobufLogWriter writer = new ProtobufLogWriter(); 113 writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null); 114 for (int i = 0; i < Byte.MAX_VALUE; i++) { 115 WALEdit edit = new WALEdit(); 116 WALEditInternalHelper.addExtendedCell(edit, 117 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 118 .setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i)) 119 .setValue(Bytes.toBytes("v-" + i)).build()); 120 writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME, 121 EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit)); 122 } 123 124 WALEdit edit2 = new WALEdit(); 125 WALEditInternalHelper.addExtendedCell(edit2, 126 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 127 .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier")) 128 .setValue(Bytes.toBytes("vv")).build()); 129 WALEditInternalHelper.addExtendedCell(edit2, 130 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 131 .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1")) 132 .setValue(Bytes.toBytes("vvv")).build()); 133 writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME, 134 EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2)); 135 writer.sync(false); 136 long offset = writer.getSyncedLength(); 137 writer.close(); 138 return Pair.newPair(path, offset); 139 } 140 141 @BeforeClass 142 public static void setUp() throws Exception { 143 Configuration conf = UTIL.getConfiguration(); 144 FS = UTIL.getTestFileSystem(); 145 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); 146 conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 147 conf.setInt("replication.source.maxretriesmultiplier", 1); 148 FS.mkdirs(UTIL.getDataTestDir()); 149 Pair<Path, Long> pair = generateWAL(); 150 TEMPLATE_WAL_FILE = pair.getFirst(); 151 END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue(); 152 WAL_FILE = UTIL.getDataTestDir("rep_source"); 153 154 METRICS_SOURCE = new MetricsSource("reset"); 155 SOURCE = mock(ReplicationSource.class); 156 when(SOURCE.isPeerEnabled()).thenReturn(true); 157 when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH)); 158 when(SOURCE.getServerWALsBelongTo()) 159 .thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())); 160 when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE); 161 ReplicationSourceManager rsm = new ReplicationSourceManager(null, null, conf, null, null, null, 162 null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); 163 when(SOURCE.getSourceManager()).thenReturn(rsm); 164 165 LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE); 166 LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID); 167 READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID); 168 } 169 170 @AfterClass 171 public static void tearDown() throws Exception { 172 READER.setReaderRunning(false); 173 READER.join(); 174 UTIL.cleanupTestDir(); 175 } 176 177 private void test(byte[] content, FSDataOutputStream out) throws Exception { 178 // minus 15 so the second entry is incomplete 179 // 15 is a magic number here, we want the reader parse the first cell but not the second cell, 180 // especially not the qualifier of the second cell. The value of the second cell is 'vvv', which 181 // is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been 182 // compressed), so 15 is a proper value, of course 14 or 16 could also work here. 183 out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15); 184 out.hflush(); 185 WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15; 186 READER.start(); 187 List<WAL.Entry> entries = new ArrayList<>(); 188 for (;;) { 189 WALEntryBatch batch = READER.poll(1000); 190 if (batch == null) { 191 break; 192 } 193 entries.addAll(batch.getWalEntries()); 194 } 195 // should return all the entries except the last one 196 assertEquals(Byte.MAX_VALUE, entries.size()); 197 for (int i = 0; i < Byte.MAX_VALUE; i++) { 198 WAL.Entry entry = entries.get(i); 199 assertEquals(1, entry.getEdit().size()); 200 Cell cell = entry.getEdit().getCells().get(0); 201 assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset())); 202 assertEquals(Bytes.toString(FAMILY), 203 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); 204 assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(), 205 cell.getQualifierOffset(), cell.getQualifierLength())); 206 assertEquals("v-" + i, 207 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 208 } 209 210 // confirm that we can not get the last one since it is incomplete 211 assertNull(READER.poll(1000)); 212 // write the last byte out 213 out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15); 214 out.hflush(); 215 WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES; 216 217 // should get the last entry 218 WALEntryBatch batch = READER.poll(10000); 219 assertEquals(1, batch.getNbEntries()); 220 WAL.Entry entry = batch.getWalEntries().get(0); 221 assertEquals(2, entry.getEdit().size()); 222 Cell cell2 = entry.getEdit().getCells().get(0); 223 assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset())); 224 assertEquals(Bytes.toString(FAMILY), 225 Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength())); 226 assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(), 227 cell2.getQualifierLength())); 228 assertEquals("vv", 229 Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength())); 230 231 Cell cell3 = entry.getEdit().getCells().get(1); 232 assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset())); 233 assertEquals(Bytes.toString(FAMILY), 234 Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength())); 235 assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(), 236 cell3.getQualifierOffset(), cell3.getQualifierLength())); 237 assertEquals("vvv", 238 Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength())); 239 } 240 241 @Test 242 public void testReset() throws Exception { 243 byte[] content; 244 try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) { 245 content = ByteStreams.toByteArray(in); 246 } 247 try (FSDataOutputStream out = FS.create(WAL_FILE)) { 248 test(content, out); 249 } 250 } 251}