001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.DataInputStream; 027import java.io.DataOutputStream; 028import java.io.IOException; 029import java.util.List; 030import java.util.Random; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 046import org.apache.hadoop.hbase.io.compress.Compression; 047import org.apache.hadoop.hbase.io.crypto.Cipher; 048import org.apache.hadoop.hbase.io.crypto.Encryption; 049import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider; 050import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 051import org.apache.hadoop.hbase.testclassification.IOTests; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.RedundantKVGenerator; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062@Category({ IOTests.class, SmallTests.class }) 063public class TestHFileEncryption { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestHFileEncryption.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestHFileEncryption.class); 070 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 071 072 private static FileSystem fs; 073 private static Encryption.Context cryptoContext; 074 075 @BeforeClass 076 public static void setUp() throws Exception { 077 Configuration conf = TEST_UTIL.getConfiguration(); 078 // Disable block cache in this test. 079 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 080 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName()); 081 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); 082 conf.setInt("hfile.format.version", 3); 083 084 fs = FileSystem.get(conf); 085 086 cryptoContext = Encryption.newContext(conf); 087 String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 088 Cipher aes = Encryption.getCipher(conf, algorithm); 089 assertNotNull(aes); 090 cryptoContext.setCipher(aes); 091 byte[] key = new byte[aes.getKeyLength()]; 092 Bytes.secureRandom(key); 093 cryptoContext.setKey(key); 094 } 095 096 private int writeBlock(Configuration conf, FSDataOutputStream os, HFileContext fileContext, 097 int size) throws IOException { 098 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, fileContext); 099 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 100 for (int j = 0; j < size; j++) { 101 dos.writeInt(j); 102 } 103 hbw.writeHeaderAndData(os); 104 LOG.info("Wrote a block at " + os.getPos() + " with" + " onDiskSizeWithHeader=" 105 + hbw.getOnDiskSizeWithHeader() + " uncompressedSizeWithoutHeader=" 106 + hbw.getOnDiskSizeWithoutHeader() + " uncompressedSizeWithoutHeader=" 107 + hbw.getUncompressedSizeWithoutHeader()); 108 return hbw.getOnDiskSizeWithHeader(); 109 } 110 111 private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size) 112 throws IOException { 113 HFileBlock b = hbr.readBlockData(pos, -1, false, false, true); 114 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 115 b.sanityCheck(); 116 assertFalse( 117 (b.getHFileContext().getCompression() != Compression.Algorithm.NONE) && b.isUnpacked()); 118 b = b.unpack(ctx, hbr); 119 LOG.info( 120 "Read a block at " + pos + " with" + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() 121 + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() 122 + " uncompressedSizeWithoutHeader=" + b.getUncompressedSizeWithoutHeader()); 123 DataInputStream dis = b.getByteStream(); 124 for (int i = 0; i < size; i++) { 125 int read = dis.readInt(); 126 if (read != i) { 127 fail("Block data corrupt at element " + i); 128 } 129 } 130 return b.getOnDiskSizeWithHeader(); 131 } 132 133 @Test 134 public void testDataBlockEncryption() throws IOException { 135 final int blocks = 10; 136 final int[] blockSizes = new int[blocks]; 137 final Random rand = ThreadLocalRandom.current(); 138 for (int i = 0; i < blocks; i++) { 139 blockSizes[i] = (1024 + rand.nextInt(1024 * 63)) / Bytes.SIZEOF_INT; 140 } 141 for (Compression.Algorithm compression : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 142 Path path = new Path(TEST_UTIL.getDataTestDir(), "block_v3_" + compression + "_AES"); 143 LOG.info("testDataBlockEncryption: encryption=AES compression=" + compression); 144 long totalSize = 0; 145 HFileContext fileContext = new HFileContextBuilder().withCompression(compression) 146 .withEncryptionContext(cryptoContext).build(); 147 FSDataOutputStream os = fs.create(path); 148 try { 149 for (int i = 0; i < blocks; i++) { 150 totalSize += writeBlock(TEST_UTIL.getConfiguration(), os, fileContext, blockSizes[i]); 151 } 152 } finally { 153 os.close(); 154 } 155 FSDataInputStream is = fs.open(path); 156 ReaderContext context = 157 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 158 .withFilePath(path).withFileSystem(fs).withFileSize(totalSize).build(); 159 try { 160 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, fileContext, 161 ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration()); 162 long pos = 0; 163 for (int i = 0; i < blocks; i++) { 164 pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]); 165 } 166 } finally { 167 is.close(); 168 } 169 } 170 } 171 172 @Test 173 public void testHFileEncryptionMetadata() throws Exception { 174 Configuration conf = TEST_UTIL.getConfiguration(); 175 CacheConfig cacheConf = new CacheConfig(conf); 176 HFileContext fileContext = 177 new HFileContextBuilder().withEncryptionContext(cryptoContext).build(); 178 179 // write a simple encrypted hfile 180 Path path = new Path(TEST_UTIL.getDataTestDir(), "cryptometa.hfile"); 181 FSDataOutputStream out = fs.create(path); 182 HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(out) 183 .withFileContext(fileContext).create(); 184 try { 185 KeyValue kv = 186 new KeyValue(Bytes.toBytes("foo"), Bytes.toBytes("f1"), null, Bytes.toBytes("value")); 187 writer.append(kv); 188 } finally { 189 writer.close(); 190 out.close(); 191 } 192 193 // read it back in and validate correct crypto metadata 194 HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); 195 try { 196 FixedFileTrailer trailer = reader.getTrailer(); 197 assertNotNull(trailer.getEncryptionKey()); 198 Encryption.Context readerContext = reader.getFileContext().getEncryptionContext(); 199 assertEquals(readerContext.getCipher().getName(), cryptoContext.getCipher().getName()); 200 assertTrue(Bytes.equals(readerContext.getKeyBytes(), cryptoContext.getKeyBytes())); 201 } finally { 202 reader.close(); 203 } 204 } 205 206 @Test 207 public void testHFileEncryption() throws Exception { 208 // Create 1000 random test KVs 209 RedundantKVGenerator generator = new RedundantKVGenerator(); 210 List<KeyValue> testKvs = generator.generateTestKeyValues(1000); 211 212 // Iterate through data block encoding and compression combinations 213 Configuration conf = TEST_UTIL.getConfiguration(); 214 CacheConfig cacheConf = new CacheConfig(conf); 215 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 216 for (Compression.Algorithm compression : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 217 HFileContext fileContext = new HFileContextBuilder().withBlockSize(4096) // small blocks 218 .withEncryptionContext(cryptoContext).withCompression(compression) 219 .withDataBlockEncoding(encoding).build(); 220 // write a new test HFile 221 LOG.info("Writing with " + fileContext); 222 Path path = new Path(TEST_UTIL.getDataTestDir(), 223 HBaseCommonTestingUtil.getRandomUUID().toString() + ".hfile"); 224 FSDataOutputStream out = fs.create(path); 225 HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf).withOutputStream(out) 226 .withFileContext(fileContext).create(); 227 try { 228 for (KeyValue kv : testKvs) { 229 writer.append(kv); 230 } 231 } finally { 232 writer.close(); 233 out.close(); 234 } 235 236 // read it back in 237 LOG.info("Reading with " + fileContext); 238 int i = 0; 239 HFileScanner scanner = null; 240 HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); 241 try { 242 FixedFileTrailer trailer = reader.getTrailer(); 243 assertNotNull(trailer.getEncryptionKey()); 244 scanner = reader.getScanner(conf, false, false); 245 assertTrue("Initial seekTo failed", scanner.seekTo()); 246 do { 247 ExtendedCell kv = scanner.getCell(); 248 assertTrue("Read back an unexpected or invalid KV", 249 testKvs.contains(KeyValueUtil.ensureKeyValue(kv))); 250 i++; 251 } while (scanner.next()); 252 } finally { 253 reader.close(); 254 scanner.close(); 255 } 256 257 assertEquals("Did not read back as many KVs as written", i, testKvs.size()); 258 259 // Test random seeks with pread 260 LOG.info("Random seeking with " + fileContext); 261 Random rand = ThreadLocalRandom.current(); 262 reader = HFile.createReader(fs, path, cacheConf, true, conf); 263 try { 264 scanner = reader.getScanner(conf, false, true); 265 assertTrue("Initial seekTo failed", scanner.seekTo()); 266 for (i = 0; i < 100; i++) { 267 KeyValue kv = testKvs.get(rand.nextInt(testKvs.size())); 268 assertEquals("Unable to find KV as expected: " + kv, 0, scanner.seekTo(kv)); 269 } 270 } finally { 271 scanner.close(); 272 reader.close(); 273 } 274 } 275 } 276 } 277 278}