001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
021import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.OptionalLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.regionserver.InternalScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.ScanType;
050import org.apache.hadoop.hbase.regionserver.StoreEngine;
051import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
052import org.apache.hadoop.hbase.regionserver.StoreUtils;
053import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
054import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
055import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.testclassification.SmallTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameter;
065import org.junit.runners.Parameterized.Parameters;
066
067@RunWith(Parameterized.class)
068@Category({ RegionServerTests.class, SmallTests.class })
069public class TestDateTieredCompactor {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestDateTieredCompactor.class);
074
075  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
076
077  private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
078
079  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
080
081  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
082
083  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
084
085  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
086
087  @Parameters(name = "{index}: usePrivateReaders={0}")
088  public static Iterable<Object[]> data() {
089    return Arrays.asList(new Object[] { true }, new Object[] { false });
090  }
091
092  @Parameter
093  public boolean usePrivateReaders;
094
095  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
096    final KeyValue[] input, List<HStoreFile> storefiles) throws Exception {
097    Configuration conf = HBaseConfiguration.create();
098    conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
099    final Scanner scanner = new Scanner(input);
100    // Create store mock that is satisfactory for compactor.
101    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
102    ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparatorImpl.COMPARATOR);
103    HStore store = mock(HStore.class);
104    when(store.getStorefiles()).thenReturn(storefiles);
105    when(store.getColumnFamilyDescriptor()).thenReturn(col);
106    when(store.getScanInfo()).thenReturn(si);
107    when(store.areWritesEnabled()).thenReturn(true);
108    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
109    when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
110    StoreEngine storeEngine = mock(StoreEngine.class);
111    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
112    when(store.getStoreEngine()).thenReturn(storeEngine);
113    when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
114    OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
115    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
116
117    return new DateTieredCompactor(conf, store) {
118      @Override
119      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
120        List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
121        byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
122        return scanner;
123      }
124
125      @Override
126      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
127        List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
128        long earliestPutTs) throws IOException {
129        return scanner;
130      }
131    };
132  }
133
134  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
135    boolean allFiles) throws Exception {
136    StoreFileWritersCapture writers = new StoreFileWritersCapture();
137    HStoreFile sf1 = createDummyStoreFile(1L);
138    HStoreFile sf2 = createDummyStoreFile(2L);
139    DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
140    List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
141      boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
142      NoLimitThroughputController.INSTANCE, null);
143    writers.verifyKvs(output, allFiles, boundaries);
144    if (allFiles) {
145      assertEquals(output.length, paths.size());
146    }
147  }
148
149  @SuppressWarnings("unchecked")
150  private static <T> T[] a(T... a) {
151    return a;
152  }
153
154  @Test
155  public void test() throws Exception {
156    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
157      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
158    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
159      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
160    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
161      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
162  }
163
164  @Test
165  public void testEmptyOutputFile() throws Exception {
166    StoreFileWritersCapture writers = new StoreFileWritersCapture();
167    CompactionRequestImpl request = createDummyRequest();
168    DateTieredCompactor dtc =
169      createCompactor(writers, new KeyValue[0], new ArrayList<>(request.getFiles()));
170    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
171      new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
172    assertEquals(1, paths.size());
173    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
174    assertEquals(1, dummyWriters.size());
175    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
176    assertTrue(dummyWriter.kvs.isEmpty());
177    assertTrue(dummyWriter.hasMetadata);
178  }
179}