001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest; 022import static org.junit.Assert.assertEquals; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.CellComparatorImpl; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.RegionInfoBuilder; 043import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.regionserver.ScanInfo; 047import org.apache.hadoop.hbase.regionserver.ScanType; 048import org.apache.hadoop.hbase.regionserver.StoreEngine; 049import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 050import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; 051import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 052import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 053import org.apache.hadoop.hbase.testclassification.RegionServerTests; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063 064@RunWith(Parameterized.class) 065@Category({ RegionServerTests.class, SmallTests.class }) 066public class TestStripeCompactor { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestStripeCompactor.class); 071 072 private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); 073 private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); 074 075 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 076 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 077 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 078 079 private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa")); 080 private static final KeyValue KV_B = kvAfter(KEY_B); 081 private static final KeyValue KV_C = kvAfter(KEY_C); 082 private static final KeyValue KV_D = kvAfter(KEY_D); 083 084 @Parameters(name = "{index}: usePrivateReaders={0}") 085 public static Iterable<Object[]> data() { 086 return Arrays.asList(new Object[] { true }, new Object[] { false }); 087 } 088 089 @Parameter 090 public boolean usePrivateReaders; 091 092 private static KeyValue kvAfter(byte[] key) { 093 return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L); 094 } 095 096 @SuppressWarnings("unchecked") 097 private static <T> T[] a(T... a) { 098 return a; 099 } 100 101 private static KeyValue[] e() { 102 return TestStripeCompactor.<KeyValue> a(); 103 } 104 105 @Test 106 public void testBoundaryCompactions() throws Exception { 107 // General verification 108 verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D), 109 a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D))); 110 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C))); 111 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) }); 112 } 113 114 @Test 115 public void testBoundaryCompactionEmptyFiles() throws Exception { 116 // No empty file if there're already files. 117 verifyBoundaryCompaction(a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), 118 null, null, false); 119 verifyBoundaryCompaction(a(KV_A, KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D), 120 a(a(KV_A), null, a(KV_C)), null, null, false); 121 // But should be created if there are no file. 122 verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, 123 null, false); 124 // In major range if there's major range. 125 verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, 126 KEY_C, false); 127 verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, 128 KEY_C, false); 129 // Major range should have files regardless of KVs. 130 verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), 131 a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false); 132 verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), 133 a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false); 134 135 } 136 137 private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output) 138 throws Exception { 139 verifyBoundaryCompaction(input, boundaries, output, null, null, true); 140 } 141 142 private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output, 143 byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception { 144 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 145 StripeCompactor sc = createCompactor(writers, input); 146 List<Path> paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, 147 majorTo, NoLimitThroughputController.INSTANCE, null); 148 writers.verifyKvs(output, allFiles, true); 149 if (allFiles) { 150 assertEquals(output.length, paths.size()); 151 writers.verifyBoundaries(boundaries); 152 } 153 } 154 155 @Test 156 public void testSizeCompactions() throws Exception { 157 // General verification with different sizes. 158 verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, 159 a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D))); 160 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY, 161 a(a(KV_A), a(KV_B), a(KV_C), a(KV_D))); 162 verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C))); 163 // Verify row boundaries are preserved. 164 verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, 165 a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D))); 166 verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY, 167 a(a(KV_A), a(KV_B, KV_B), a(KV_C))); 168 // Too much data, count limits the number of files. 169 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY, 170 a(a(KV_A), a(KV_B, KV_C, KV_D))); 171 verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D, 172 new KeyValue[][] { a(KV_A, KV_B, KV_C) }); 173 // Too little data/large count, no extra files. 174 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY, 175 a(a(KV_A, KV_B), a(KV_C, KV_D))); 176 } 177 178 private void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, byte[] left, 179 byte[] right, KeyValue[][] output) throws Exception { 180 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 181 StripeCompactor sc = createCompactor(writers, input); 182 List<Path> paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, 183 null, NoLimitThroughputController.INSTANCE, null); 184 assertEquals(output.length, paths.size()); 185 writers.verifyKvs(output, true, true); 186 List<byte[]> boundaries = new ArrayList<>(output.length + 2); 187 boundaries.add(left); 188 for (int i = 1; i < output.length; ++i) { 189 boundaries.add(CellUtil.cloneRow(output[i][0])); 190 } 191 boundaries.add(right); 192 writers.verifyBoundaries(boundaries.toArray(new byte[][] {})); 193 } 194 195 private StripeCompactor createCompactor(StoreFileWritersCapture writers, KeyValue[] input) 196 throws Exception { 197 Configuration conf = HBaseConfiguration.create(); 198 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 199 final Scanner scanner = new Scanner(input); 200 201 // Create store mock that is satisfactory for compactor. 202 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(NAME_OF_THINGS); 203 ScanInfo si = 204 new ScanInfo(conf, familyDescriptor, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR); 205 HStore store = mock(HStore.class); 206 when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor); 207 when(store.getScanInfo()).thenReturn(si); 208 when(store.areWritesEnabled()).thenReturn(true); 209 when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); 210 when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); 211 StoreEngine storeEngine = mock(StoreEngine.class); 212 when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); 213 when(store.getStoreEngine()).thenReturn(storeEngine); 214 when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); 215 216 return new StripeCompactor(conf, store) { 217 @Override 218 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 219 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 220 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 221 return scanner; 222 } 223 224 @Override 225 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 226 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 227 long earliestPutTs) throws IOException { 228 return scanner; 229 } 230 }; 231 } 232}