001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest; 021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.HashMap; 032import java.util.List; 033import java.util.OptionalLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.CellComparatorImpl; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 046import org.apache.hadoop.hbase.regionserver.HStore; 047import org.apache.hadoop.hbase.regionserver.HStoreFile; 048import org.apache.hadoop.hbase.regionserver.InternalScanner; 049import org.apache.hadoop.hbase.regionserver.ScanInfo; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.StoreEngine; 052import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 053import org.apache.hadoop.hbase.regionserver.StoreUtils; 054import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; 055import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 056import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.testclassification.SmallTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065import org.junit.runners.Parameterized.Parameter; 066import org.junit.runners.Parameterized.Parameters; 067 068@RunWith(Parameterized.class) 069@Category({ RegionServerTests.class, SmallTests.class }) 070public class TestDateTieredCompactor { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestDateTieredCompactor.class); 075 076 private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); 077 078 private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); 079 080 private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L); 081 082 private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L); 083 084 private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L); 085 086 private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L); 087 088 @Parameters(name = "{index}: usePrivateReaders={0}") 089 public static Iterable<Object[]> data() { 090 return Arrays.asList(new Object[] { true }, new Object[] { false }); 091 } 092 093 @Parameter 094 public boolean usePrivateReaders; 095 096 private DateTieredCompactor createCompactor(StoreFileWritersCapture writers, 097 final KeyValue[] input, List<HStoreFile> storefiles) throws Exception { 098 Configuration conf = HBaseConfiguration.create(); 099 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 100 final Scanner scanner = new Scanner(input); 101 // Create store mock that is satisfactory for compactor. 102 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(NAME_OF_THINGS); 103 ScanInfo si = 104 new ScanInfo(conf, familyDescriptor, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR); 105 HStore store = mock(HStore.class); 106 when(store.getStorefiles()).thenReturn(storefiles); 107 when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor); 108 when(store.getScanInfo()).thenReturn(si); 109 when(store.areWritesEnabled()).thenReturn(true); 110 when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); 111 when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); 112 StoreEngine storeEngine = mock(StoreEngine.class); 113 when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); 114 when(store.getStoreEngine()).thenReturn(storeEngine); 115 when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); 116 OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles); 117 when(store.getMaxSequenceId()).thenReturn(maxSequenceId); 118 119 return new DateTieredCompactor(conf, store) { 120 @Override 121 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 122 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 123 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 124 return scanner; 125 } 126 127 @Override 128 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 129 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 130 long earliestPutTs) throws IOException { 131 return scanner; 132 } 133 }; 134 } 135 136 private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output, 137 boolean allFiles) throws Exception { 138 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 139 HStoreFile sf1 = createDummyStoreFile(1L); 140 HStoreFile sf2 = createDummyStoreFile(2L); 141 DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2)); 142 List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)), 143 boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(), 144 NoLimitThroughputController.INSTANCE, null); 145 writers.verifyKvs(output, allFiles, boundaries); 146 if (allFiles) { 147 assertEquals(output.length, paths.size()); 148 } 149 } 150 151 @SuppressWarnings("unchecked") 152 private static <T> T[] a(T... a) { 153 return a; 154 } 155 156 @Test 157 public void test() throws Exception { 158 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), 159 a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true); 160 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE), 161 a(a(KV_A), a(KV_B, KV_C, KV_D)), false); 162 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), 163 new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false); 164 } 165 166 @Test 167 public void testEmptyOutputFile() throws Exception { 168 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 169 CompactionRequestImpl request = createDummyRequest(); 170 DateTieredCompactor dtc = 171 createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles())); 172 List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), 173 new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null); 174 assertEquals(1, paths.size()); 175 List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters(); 176 assertEquals(1, dummyWriters.size()); 177 StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0); 178 assertTrue(dummyWriter.kvs.isEmpty()); 179 assertTrue(dummyWriter.hasMetadata); 180 } 181}