001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.InputStream; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ArrayBackedTag; 031import org.apache.hadoop.hbase.ByteBufferKeyValue; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.Tag; 038import org.apache.hadoop.hbase.codec.Codec.Decoder; 039import org.apache.hadoop.hbase.codec.Codec.Encoder; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.util.LRUDictionary; 042import org.apache.hadoop.hbase.testclassification.RegionServerTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.junit.runners.Parameterized.Parameters; 051 052@Category({ RegionServerTests.class, SmallTests.class }) 053@RunWith(Parameterized.class) 054public class TestWALCellCodecWithCompression { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestWALCellCodecWithCompression.class); 059 060 private Compression.Algorithm compression; 061 062 public TestWALCellCodecWithCompression(Compression.Algorithm algo) { 063 this.compression = algo; 064 } 065 066 @Parameters 067 public static List<Object[]> params() { 068 return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED; 069 } 070 071 @Test 072 public void testEncodeDecodeKVsWithTags() throws Exception { 073 doTest(false, false); 074 } 075 076 @Test 077 public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { 078 doTest(true, false); 079 } 080 081 @Test 082 public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { 083 doTest(true, false); 084 } 085 086 @Test 087 public void testValueCompressionEnabled() throws Exception { 088 doTest(false, true); 089 } 090 091 @Test 092 public void testValueCompression() throws Exception { 093 final byte[] row_1 = Bytes.toBytes("row_1"); 094 final byte[] value_1 = new byte[20]; 095 Bytes.zero(value_1); 096 final byte[] row_2 = Bytes.toBytes("row_2"); 097 final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; 098 Bytes.random(value_2); 099 final byte[] row_3 = Bytes.toBytes("row_3"); 100 final byte[] value_3 = new byte[100]; 101 Bytes.random(value_3); 102 final byte[] row_4 = Bytes.toBytes("row_4"); 103 final byte[] value_4 = new byte[128]; 104 fillBytes(value_4, Bytes.toBytes("DEADBEEF")); 105 final byte[] row_5 = Bytes.toBytes("row_5"); 106 final byte[] value_5 = new byte[64]; 107 fillBytes(value_5, Bytes.toBytes("CAFEBABE")); 108 109 Configuration conf = new Configuration(false); 110 WALCellCodec codec = new WALCellCodec(conf, 111 new CompressionContext(LRUDictionary.class, false, true, true, compression)); 112 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 113 Encoder encoder = codec.getEncoder(bos); 114 encoder.write(createKV(row_1, value_1, 0)); 115 encoder.write(createKV(row_2, value_2, 0)); 116 encoder.write(createKV(row_3, value_3, 0)); 117 encoder.write(createKV(row_4, value_4, 0)); 118 encoder.write(createKV(row_5, value_5, 0)); 119 encoder.flush(); 120 try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { 121 Decoder decoder = codec.getDecoder(is); 122 decoder.advance(); 123 KeyValue kv = (KeyValue) decoder.current(); 124 assertTrue(Bytes.equals(row_1, 0, row_1.length, kv.getRowArray(), kv.getRowOffset(), 125 kv.getRowLength())); 126 assertTrue(Bytes.equals(value_1, 0, value_1.length, kv.getValueArray(), kv.getValueOffset(), 127 kv.getValueLength())); 128 decoder.advance(); 129 kv = (KeyValue) decoder.current(); 130 assertTrue(Bytes.equals(row_2, 0, row_2.length, kv.getRowArray(), kv.getRowOffset(), 131 kv.getRowLength())); 132 assertTrue(Bytes.equals(value_2, 0, value_2.length, kv.getValueArray(), kv.getValueOffset(), 133 kv.getValueLength())); 134 decoder.advance(); 135 kv = (KeyValue) decoder.current(); 136 assertTrue(Bytes.equals(row_3, 0, row_3.length, kv.getRowArray(), kv.getRowOffset(), 137 kv.getRowLength())); 138 assertTrue(Bytes.equals(value_3, 0, value_3.length, kv.getValueArray(), kv.getValueOffset(), 139 kv.getValueLength())); 140 decoder.advance(); 141 kv = (KeyValue) decoder.current(); 142 assertTrue(Bytes.equals(row_4, 0, row_4.length, kv.getRowArray(), kv.getRowOffset(), 143 kv.getRowLength())); 144 assertTrue(Bytes.equals(value_4, 0, value_4.length, kv.getValueArray(), kv.getValueOffset(), 145 kv.getValueLength())); 146 decoder.advance(); 147 kv = (KeyValue) decoder.current(); 148 assertTrue(Bytes.equals(row_5, 0, row_5.length, kv.getRowArray(), kv.getRowOffset(), 149 kv.getRowLength())); 150 assertTrue(Bytes.equals(value_5, 0, value_5.length, kv.getValueArray(), kv.getValueOffset(), 151 kv.getValueLength())); 152 } 153 } 154 155 static void fillBytes(byte[] buffer, byte[] fill) { 156 int offset = 0; 157 int remaining = buffer.length; 158 while (remaining > 0) { 159 int len = remaining < fill.length ? remaining : fill.length; 160 System.arraycopy(fill, 0, buffer, offset, len); 161 offset += len; 162 remaining -= len; 163 } 164 } 165 166 private void doTest(boolean compressTags, boolean offheapKV) throws Exception { 167 final byte[] key = Bytes.toBytes("myRow"); 168 final byte[] value = Bytes.toBytes("myValue"); 169 Configuration conf = new Configuration(false); 170 conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); 171 WALCellCodec codec = 172 new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags)); 173 ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); 174 Encoder encoder = codec.getEncoder(bos); 175 if (offheapKV) { 176 encoder.write(createOffheapKV(key, value, 1)); 177 encoder.write(createOffheapKV(key, value, 0)); 178 encoder.write(createOffheapKV(key, value, 2)); 179 } else { 180 encoder.write(createKV(key, value, 1)); 181 encoder.write(createKV(key, value, 0)); 182 encoder.write(createKV(key, value, 2)); 183 } 184 185 InputStream is = new ByteArrayInputStream(bos.toByteArray()); 186 Decoder decoder = codec.getDecoder(is); 187 decoder.advance(); 188 KeyValue kv = (KeyValue) decoder.current(); 189 List<Tag> tags = PrivateCellUtil.getTags(kv); 190 assertEquals(1, tags.size()); 191 assertEquals("tagValue1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 192 decoder.advance(); 193 kv = (KeyValue) decoder.current(); 194 tags = PrivateCellUtil.getTags(kv); 195 assertEquals(0, tags.size()); 196 decoder.advance(); 197 kv = (KeyValue) decoder.current(); 198 tags = PrivateCellUtil.getTags(kv); 199 assertEquals(2, tags.size()); 200 assertEquals("tagValue1", Bytes.toString(Tag.cloneValue(tags.get(0)))); 201 assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1)))); 202 } 203 204 private KeyValue createKV(byte[] row, byte[] value, int noOfTags) { 205 byte[] cf = Bytes.toBytes("myCF"); 206 byte[] q = Bytes.toBytes("myQualifier"); 207 List<Tag> tags = new ArrayList<>(noOfTags); 208 for (int i = 1; i <= noOfTags; i++) { 209 tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); 210 } 211 return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); 212 } 213 214 private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) { 215 byte[] cf = Bytes.toBytes("myCF"); 216 byte[] q = Bytes.toBytes("myQualifier"); 217 List<Tag> tags = new ArrayList<>(noOfTags); 218 for (int i = 1; i <= noOfTags; i++) { 219 tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); 220 } 221 KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); 222 ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); 223 dbb.put(kv.getBuffer()); 224 return new ByteBufferKeyValue(dbb, 0, kv.getBuffer().length); 225 } 226}