001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY; 021import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY; 022import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; 023import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.mockito.AdditionalMatchers.aryEq; 030import static org.mockito.ArgumentMatchers.any; 031import static org.mockito.ArgumentMatchers.anyBoolean; 032import static org.mockito.ArgumentMatchers.anyInt; 033import static org.mockito.ArgumentMatchers.anyLong; 034import static org.mockito.ArgumentMatchers.argThat; 035import static org.mockito.ArgumentMatchers.eq; 036import static org.mockito.ArgumentMatchers.isNull; 037import static org.mockito.Mockito.mock; 038import static org.mockito.Mockito.only; 039import static org.mockito.Mockito.times; 040import static org.mockito.Mockito.verify; 041import static org.mockito.Mockito.when; 042 043import java.io.IOException; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Iterator; 048import java.util.List; 049import java.util.OptionalLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.CellComparatorImpl; 053import org.apache.hadoop.hbase.ExtendedCell; 054import org.apache.hadoop.hbase.HBaseClassTestRule; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.io.TimeRange; 062import org.apache.hadoop.hbase.io.hfile.HFile; 063import org.apache.hadoop.hbase.regionserver.BloomType; 064import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; 065import org.apache.hadoop.hbase.regionserver.HStore; 066import org.apache.hadoop.hbase.regionserver.HStoreFile; 067import org.apache.hadoop.hbase.regionserver.InternalScanner; 068import org.apache.hadoop.hbase.regionserver.ScanInfo; 069import org.apache.hadoop.hbase.regionserver.ScanType; 070import org.apache.hadoop.hbase.regionserver.ScannerContext; 071import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 072import org.apache.hadoop.hbase.regionserver.StoreEngine; 073import org.apache.hadoop.hbase.regionserver.StoreFileReader; 074import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 075import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; 076import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 077import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; 078import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; 079import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; 080import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; 081import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 082import org.apache.hadoop.hbase.testclassification.MediumTests; 083import org.apache.hadoop.hbase.testclassification.RegionServerTests; 084import org.apache.hadoop.hbase.util.Bytes; 085import org.apache.hadoop.hbase.util.ConcatenatedLists; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 088import org.junit.ClassRule; 089import org.junit.Test; 090import org.junit.experimental.categories.Category; 091import org.junit.runner.RunWith; 092import org.junit.runners.Parameterized; 093import org.junit.runners.Parameterized.Parameter; 094import org.junit.runners.Parameterized.Parameters; 095import org.mockito.ArgumentMatcher; 096 097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 098import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 099 100@RunWith(Parameterized.class) 101@Category({ RegionServerTests.class, MediumTests.class }) 102public class TestStripeCompactionPolicy { 103 104 @ClassRule 105 public static final HBaseClassTestRule CLASS_RULE = 106 HBaseClassTestRule.forClass(TestStripeCompactionPolicy.class); 107 108 private static final byte[] KEY_A = Bytes.toBytes("aaa"); 109 private static final byte[] KEY_B = Bytes.toBytes("bbb"); 110 private static final byte[] KEY_C = Bytes.toBytes("ccc"); 111 private static final byte[] KEY_D = Bytes.toBytes("ddd"); 112 private static final byte[] KEY_E = Bytes.toBytes("eee"); 113 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); 114 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); 115 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); 116 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); 117 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); 118 119 private static long defaultSplitSize = 18; 120 private static float defaultSplitCount = 1.8F; 121 private final static int defaultInitialCount = 1; 122 private static long defaultTtl = 1000 * 1000; 123 124 @Parameters(name = "{index}: usePrivateReaders={0}") 125 public static Iterable<Object[]> data() { 126 return Arrays.asList(new Object[] { true }, new Object[] { false }); 127 } 128 129 @Parameter 130 public boolean usePrivateReaders; 131 132 @Test 133 public void testNoStripesFromFlush() throws Exception { 134 Configuration conf = HBaseConfiguration.create(); 135 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); 136 StripeCompactionPolicy policy = createPolicy(conf); 137 StripeInformationProvider si = createStripesL0Only(0, 0); 138 139 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; 140 KeyValue[][] expected = new KeyValue[][] { input }; 141 verifyFlush(policy, si, input, expected, null); 142 } 143 144 @Test 145 public void testOldStripesFromFlush() throws Exception { 146 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 147 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); 148 149 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 150 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, 151 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; 152 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); 153 } 154 155 @Test 156 public void testNewStripesFromFlush() throws Exception { 157 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 158 StripeInformationProvider si = createStripesL0Only(0, 0); 159 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; 160 // Starts with one stripe; unlike flush results, must have metadata 161 KeyValue[][] expected = new KeyValue[][] { input }; 162 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); 163 } 164 165 @Test 166 public void testSingleStripeCompaction() throws Exception { 167 // Create a special policy that only compacts single stripes, using standard methods. 168 Configuration conf = HBaseConfiguration.create(); 169 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 170 conf.unset("hbase.hstore.compaction.min.size"); 171 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); 172 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3); 173 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); 174 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits 175 StoreConfigInformation sci = mock(StoreConfigInformation.class); 176 when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); 177 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 178 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { 179 @Override 180 public StripeCompactionRequest selectCompaction(StripeInformationProvider si, 181 List<HStoreFile> filesCompacting, boolean isOffpeak) throws IOException { 182 if (!filesCompacting.isEmpty()) { 183 return null; 184 } 185 return selectSingleStripeCompaction(si, false, false, isOffpeak); 186 } 187 188 @Override 189 public boolean needsCompactions(StripeInformationProvider si, 190 List<HStoreFile> filesCompacting) { 191 if (!filesCompacting.isEmpty()) { 192 return false; 193 } 194 return needsSingleStripeCompaction(si); 195 } 196 }; 197 198 // No compaction due to min files or ratio 199 StripeInformationProvider si = 200 createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L }); 201 verifyNoCompaction(policy, si); 202 // No compaction due to min files or ratio - will report needed, but not do any. 203 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 204 new Long[] { 5L, 1L, 1L }); 205 assertNull(policy.selectCompaction(si, al(), false)); 206 assertTrue(policy.needsCompactions(si, al())); 207 // One stripe has possible compaction 208 si = createStripesWithSizes(0, 0, new Long[] { 2L }, new Long[] { 3L, 3L }, 209 new Long[] { 5L, 4L, 3L }); 210 verifySingleStripeCompaction(policy, si, 2, null); 211 // Several stripes have possible compactions; choose best quality (removes most files) 212 si = createStripesWithSizes(0, 0, new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, 213 new Long[] { 3L, 2L, 2L, 1L }); 214 verifySingleStripeCompaction(policy, si, 2, null); 215 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, 216 new Long[] { 3L, 2L, 2L }); 217 verifySingleStripeCompaction(policy, si, 1, null); 218 // Or with smallest files, if the count is the same 219 si = createStripesWithSizes(0, 0, new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, 220 new Long[] { 3L, 2L, 2L }); 221 verifySingleStripeCompaction(policy, si, 1, null); 222 // Verify max count is respected. 223 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L }); 224 List<HStoreFile> sfs = si.getStripes().get(1).subList(1, 5); 225 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 226 // Verify ratio is applied. 227 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L }); 228 sfs = si.getStripes().get(1).subList(1, 5); 229 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true); 230 } 231 232 @Test 233 public void testWithParallelCompaction() throws Exception { 234 // TODO: currently only one compaction at a time per store is allowed. If this changes, 235 // the appropriate file exclusion testing would need to be done in respective tests. 236 assertNull(createPolicy(HBaseConfiguration.create()) 237 .selectCompaction(mock(StripeInformationProvider.class), al(createFile()), false)); 238 } 239 240 @Test 241 public void testWithReferences() throws Exception { 242 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); 243 StripeCompactor sc = mock(StripeCompactor.class); 244 HStoreFile ref = createFile(); 245 when(ref.isReference()).thenReturn(true); 246 StripeInformationProvider si = mock(StripeInformationProvider.class); 247 Collection<HStoreFile> sfs = al(ref, createFile()); 248 when(si.getStoreFiles()).thenReturn(sfs); 249 250 assertTrue(policy.needsCompactions(si, al())); 251 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 252 // UnmodifiableCollection does not implement equals so we need to change it here to a 253 // collection that implements it. 254 assertEquals(si.getStoreFiles(), new ArrayList<>(scr.getRequest().getFiles())); 255 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 256 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), 257 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any()); 258 } 259 260 @Test 261 public void testInitialCountFromL0() throws Exception { 262 Configuration conf = HBaseConfiguration.create(); 263 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2); 264 StripeCompactionPolicy policy = 265 createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false); 266 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); 267 verifyCompaction(policy, si, si.getStoreFiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); 268 si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. 269 verifyCompaction(policy, si, si.getStoreFiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); 270 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); 271 verifyCompaction(policy, si, si.getStoreFiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); 272 } 273 274 @Test 275 public void testSelectL0Compaction() throws Exception { 276 // test select ALL L0 files when L0 files count > MIN_FILES_L0_KEY 277 Configuration conf = HBaseConfiguration.create(); 278 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 279 StripeCompactionPolicy policy = createPolicy(conf); 280 StripeCompactionPolicy.StripeInformationProvider si = createStripesWithSizes(10, 10L, 281 new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, new Long[] { 3L, 2L, 2L }); 282 StripeCompactionPolicy.StripeCompactionRequest cr = policy.selectCompaction(si, al(), false); 283 assertNotNull(cr); 284 assertEquals(10, cr.getRequest().getFiles().size()); 285 verifyCollectionsEqual(si.getLevel0Files(), cr.getRequest().getFiles()); 286 287 // test select partial L0 files when size of L0 files > HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY 288 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L); 289 policy = createPolicy(conf); 290 si = createStripesWithSizes(5, 50L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 291 new Long[] { 3L, 2L, 2L }); 292 cr = policy.selectCompaction(si, al(), false); 293 assertNotNull(cr); 294 assertEquals(2, cr.getRequest().getFiles().size()); 295 verifyCollectionsEqual(si.getLevel0Files().subList(0, 2), cr.getRequest().getFiles()); 296 297 // test select partial L0 files when count of L0 files > MAX_FILES_KEY 298 conf.setInt(MAX_FILES_KEY, 6); 299 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 1000L); 300 policy = createPolicy(conf); 301 si = createStripesWithSizes(10, 10L, new Long[] { 5L, 1L, 1L }, new Long[] { 3L, 2L, 2L }, 302 new Long[] { 3L, 2L, 2L }); 303 cr = policy.selectCompaction(si, al(), false); 304 assertNotNull(cr); 305 assertEquals(6, cr.getRequest().getFiles().size()); 306 verifyCollectionsEqual(si.getLevel0Files().subList(0, 6), cr.getRequest().getFiles()); 307 } 308 309 @Test 310 public void testExistingStripesFromL0() throws Exception { 311 Configuration conf = HBaseConfiguration.create(); 312 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3); 313 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A); 314 verifyCompaction(createPolicy(conf), si, si.getLevel0Files(), null, null, 315 si.getStripeBoundaries()); 316 } 317 318 @Test 319 public void testNothingToCompactFromL0() throws Exception { 320 Configuration conf = HBaseConfiguration.create(); 321 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4); 322 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10); 323 StripeCompactionPolicy policy = createPolicy(conf); 324 verifyNoCompaction(policy, si); 325 326 si = createStripes(3, KEY_A); 327 verifyNoCompaction(policy, si); 328 } 329 330 @Test 331 public void testCheckExpiredStripeCompaction() throws Exception { 332 Configuration conf = HBaseConfiguration.create(); 333 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 5); 334 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 335 336 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 337 long now = defaultTtl + 2; 338 edge.setValue(now); 339 EnvironmentEdgeManager.injectEdge(edge); 340 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 341 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 342 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 343 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 344 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 345 346 StripeCompactionPolicy policy = 347 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 348 // Merge expired if there are eligible stripes. 349 StripeCompactionPolicy.StripeInformationProvider si = 350 createStripesWithFiles(mixed, mixed, mixed); 351 assertFalse(policy.needsCompactions(si, al())); 352 353 si = createStripesWithFiles(mixed, mixed, mixed, expired); 354 assertFalse(policy.needsSingleStripeCompaction(si)); 355 assertTrue(policy.hasExpiredStripes(si)); 356 assertTrue(policy.needsCompactions(si, al())); 357 } 358 359 @Test 360 public void testSplitOffStripe() throws Exception { 361 Configuration conf = HBaseConfiguration.create(); 362 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 363 conf.unset("hbase.hstore.compaction.min.size"); 364 // First test everything with default split count of 2, then split into more. 365 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 366 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L }; 367 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L }; 368 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 369 // Don't split if not eligible for compaction. 370 StripeCompactionPolicy.StripeInformationProvider si = 371 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L }); 372 assertNull(createPolicy(conf).selectCompaction(si, al(), false)); 373 // Make sure everything is eligible. 374 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f); 375 StripeCompactionPolicy policy = createPolicy(conf); 376 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize); 377 // Add some extra stripes... 378 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit); 379 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize); 380 // In the middle. 381 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit); 382 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize); 383 // No split-off with different config (larger split size). 384 // However, in this case some eligible stripe will just be compacted alone. 385 StripeCompactionPolicy specPolicy = 386 createPolicy(conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false); 387 verifySingleStripeCompaction(specPolicy, si, 1, null); 388 } 389 390 @Test 391 public void testSplitOffStripeOffPeak() throws Exception { 392 // for HBASE-11439 393 Configuration conf = HBaseConfiguration.create(); 394 395 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 396 conf.unset("hbase.hstore.compaction.min.size"); 397 398 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 399 // Select the last 2 files. 400 StripeCompactionPolicy.StripeInformationProvider si = 401 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L }); 402 assertEquals(2, 403 createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles().size()); 404 // Make sure everything is eligible in offpeak. 405 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f); 406 assertEquals(3, 407 createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles().size()); 408 } 409 410 @Test 411 public void testSplitOffStripeDropDeletes() throws Exception { 412 Configuration conf = HBaseConfiguration.create(); 413 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2); 414 StripeCompactionPolicy policy = createPolicy(conf); 415 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 }; 416 Long[] noSplit = new Long[] { 1L }; 417 long splitTargetSize = (long) (defaultSplitSize / defaultSplitCount); 418 419 // Verify the deletes can be dropped if there are no L0 files. 420 StripeCompactionPolicy.StripeInformationProvider si = 421 createStripesWithSizes(0, 0, noSplit, toSplit); 422 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize); 423 // But cannot be dropped if there are. 424 si = createStripesWithSizes(2, 2, noSplit, toSplit); 425 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize); 426 } 427 428 @SuppressWarnings("unchecked") 429 @Test 430 public void testMergeExpiredFiles() throws Exception { 431 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 432 long now = defaultTtl + 2; 433 edge.setValue(now); 434 EnvironmentEdgeManager.injectEdge(edge); 435 try { 436 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 437 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 438 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 439 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 440 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 441 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 442 443 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 444 defaultSplitCount, defaultInitialCount, true); 445 // Merge expired if there are eligible stripes. 446 StripeCompactionPolicy.StripeInformationProvider si = 447 createStripesWithFiles(expired, expired, expired); 448 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false); 449 // Don't merge if nothing expired. 450 si = createStripesWithFiles(notExpired, notExpired, notExpired); 451 assertNull(policy.selectCompaction(si, al(), false)); 452 // Merge one expired stripe with next. 453 si = createStripesWithFiles(notExpired, expired, notExpired); 454 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false); 455 // Merge the biggest run out of multiple options. 456 // Merge one expired stripe with next. 457 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 458 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false); 459 // Stripe with a subset of expired files is not merged. 460 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed); 461 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false); 462 } finally { 463 EnvironmentEdgeManager.reset(); 464 } 465 } 466 467 @SuppressWarnings("unchecked") 468 @Test 469 public void testMergeExpiredStripes() throws Exception { 470 // HBASE-11397 471 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 472 long now = defaultTtl + 2; 473 edge.setValue(now); 474 EnvironmentEdgeManager.injectEdge(edge); 475 try { 476 HStoreFile expiredFile = createFile(), notExpiredFile = createFile(); 477 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 478 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 479 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 480 List<HStoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); 481 482 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(), defaultSplitSize, 483 defaultSplitCount, defaultInitialCount, true); 484 485 // Merge all three expired stripes into one. 486 StripeCompactionPolicy.StripeInformationProvider si = 487 createStripesWithFiles(expired, expired, expired); 488 verifyMergeCompatcion(policy, si, 0, 2); 489 490 // Merge two adjacent expired stripes into one. 491 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); 492 verifyMergeCompatcion(policy, si, 3, 4); 493 } finally { 494 EnvironmentEdgeManager.reset(); 495 } 496 } 497 498 @SuppressWarnings("unchecked") 499 private static StripeCompactionPolicy.StripeInformationProvider 500 createStripesWithFiles(List<HStoreFile>... stripeFiles) throws Exception { 501 return createStripesWithFiles(createBoundaries(stripeFiles.length), 502 Lists.newArrayList(stripeFiles), new ArrayList<>()); 503 } 504 505 @Test 506 public void testSingleStripeDropDeletes() throws Exception { 507 Configuration conf = HBaseConfiguration.create(); 508 // Test depends on this not being set to pass. Default breaks test. TODO: Revisit. 509 conf.unset("hbase.hstore.compaction.min.size"); 510 StripeCompactionPolicy policy = createPolicy(conf); 511 // Verify the deletes can be dropped if there are no L0 files. 512 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 513 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes); 514 verifySingleStripeCompaction(policy, si, 0, true); 515 // But cannot be dropped if there are. 516 si = createStripesWithSizes(2, 2, stripes); 517 verifySingleStripeCompaction(policy, si, 0, false); 518 // Unless there are enough to cause L0 compaction. 519 si = createStripesWithSizes(6, 2, stripes); 520 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 521 sfs.addSublist(si.getLevel0Files()); 522 sfs.addSublist(si.getStripes().get(0)); 523 verifyCompaction(policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries()); 524 // If we cannot actually compact all files in some stripe, L0 is chosen. 525 si = createStripesWithSizes(6, 2, 526 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } }); 527 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries()); 528 // even if L0 has no file 529 // if all files of stripe aren't selected, delete must not be dropped. 530 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } }; 531 si = createStripesWithSizes(0, 0, stripes); 532 List<HStoreFile> compactFile = new ArrayList<>(); 533 Iterator<HStoreFile> iter = si.getStripes().get(0).listIterator(1); 534 while (iter.hasNext()) { 535 compactFile.add(iter.next()); 536 } 537 verifyCompaction(policy, si, compactFile, false, 1, null, si.getStartRow(0), si.getEndRow(0), 538 true); 539 } 540 541 @Test 542 public void testCheckExpiredL0Compaction() throws Exception { 543 Configuration conf = HBaseConfiguration.create(); 544 int minL0 = 100; 545 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, minL0); 546 conf.setInt(MIN_FILES_KEY, 4); 547 548 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 549 long now = defaultTtl + 2; 550 edge.setValue(now); 551 EnvironmentEdgeManager.injectEdge(edge); 552 HStoreFile expiredFile = createFile(10), notExpiredFile = createFile(10); 553 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); 554 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); 555 List<HStoreFile> expired = Lists.newArrayList(expiredFile, expiredFile); 556 List<HStoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile); 557 558 StripeCompactionPolicy policy = 559 createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, true); 560 // Merge expired if there are eligible stripes. 561 StripeCompactionPolicy.StripeInformationProvider si = 562 createStripesWithFiles(null, new ArrayList<>(), mixed); 563 assertFalse(policy.needsCompactions(si, al())); 564 565 List<HStoreFile> largeMixed = new ArrayList<>(); 566 for (int i = 0; i < minL0 - 1; i++) { 567 largeMixed.add(i % 2 == 0 ? notExpiredFile : expiredFile); 568 } 569 si = createStripesWithFiles(null, new ArrayList<>(), largeMixed); 570 assertFalse(policy.needsCompactions(si, al())); 571 572 si = createStripesWithFiles(null, new ArrayList<>(), expired); 573 assertFalse(policy.needsSingleStripeCompaction(si)); 574 assertFalse(policy.hasExpiredStripes(si)); 575 assertTrue(policy.allL0FilesExpired(si)); 576 assertTrue(policy.needsCompactions(si, al())); 577 } 578 579 /********* HELPER METHODS ************/ 580 private static StripeCompactionPolicy createPolicy(Configuration conf) throws Exception { 581 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false); 582 } 583 584 private static StripeCompactionPolicy createPolicy(Configuration conf, long splitSize, 585 float splitCount, int initialCount, boolean hasTtl) throws Exception { 586 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize); 587 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); 588 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); 589 StoreConfigInformation sci = mock(StoreConfigInformation.class); 590 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); 591 when(sci.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); 592 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); 593 return new StripeCompactionPolicy(conf, sci, ssc); 594 } 595 596 private static ArrayList<HStoreFile> al(HStoreFile... sfs) { 597 return new ArrayList<>(Arrays.asList(sfs)); 598 } 599 600 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, 601 int from, int to) throws Exception { 602 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 603 Collection<HStoreFile> sfs = getAllFiles(si, from, to); 604 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 605 606 // All the Stripes are expired, so the Compactor will not create any Writers. We need to create 607 // an empty file to preserve metadata 608 StripeCompactor sc = createCompactor(); 609 List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 610 assertEquals(1, paths.size()); 611 } 612 613 /** 614 * Verify the compaction that includes several entire stripes. 615 * @param policy Policy to test. 616 * @param si Stripe information pre-set with stripes to test. 617 * @param from Starting stripe. 618 * @param to Ending stripe (inclusive). 619 * @param dropDeletes Whether to drop deletes from compaction range. 620 * @param count Expected # of resulting stripes, null if not checked. 621 * @param size Expected target stripe size, null if not checked. 622 */ 623 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 624 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size, 625 boolean needsCompaction) throws IOException { 626 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes, count, size, 627 si.getStartRow(from), si.getEndRow(to), needsCompaction); 628 } 629 630 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy, 631 StripeInformationProvider si, int from, int to, Boolean dropDeletes, Integer count, Long size) 632 throws IOException { 633 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true); 634 } 635 636 private void verifySingleStripeCompaction(StripeCompactionPolicy policy, 637 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException { 638 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true); 639 } 640 641 /** 642 * Verify no compaction is needed or selected. 643 * @param policy Policy to test. 644 * @param si Stripe information pre-set with stripes to test. 645 */ 646 private void verifyNoCompaction(StripeCompactionPolicy policy, StripeInformationProvider si) 647 throws IOException { 648 assertNull(policy.selectCompaction(si, al(), false)); 649 assertFalse(policy.needsCompactions(si, al())); 650 } 651 652 /** 653 * Verify arbitrary compaction. 654 * @param policy Policy to test. 655 * @param si Stripe information pre-set with stripes to test. 656 * @param sfs Files that should be compacted. 657 * @param dropDeletesFrom Row from which to drop deletes. 658 * @param dropDeletesTo Row to which to drop deletes. 659 * @param boundaries Expected target stripe boundaries. 660 */ 661 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 662 Collection<HStoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo, 663 final List<byte[]> boundaries) throws Exception { 664 StripeCompactor sc = mock(StripeCompactor.class); 665 assertTrue(policy.needsCompactions(si, al())); 666 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 667 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 668 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 669 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { 670 @Override 671 public boolean matches(List<byte[]> argument) { 672 List<byte[]> other = argument; 673 if (other.size() != boundaries.size()) { 674 return false; 675 } 676 for (int i = 0; i < other.size(); ++i) { 677 if (!Bytes.equals(other.get(i), boundaries.get(i))) { 678 return false; 679 } 680 } 681 return true; 682 } 683 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), 684 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), any(), any()); 685 } 686 687 /** 688 * Verify arbitrary compaction. 689 * @param policy Policy to test. 690 * @param si Stripe information pre-set with stripes to test. 691 * @param sfs Files that should be compacted. 692 * @param dropDeletes Whether to drop deletes from compaction range. 693 * @param count Expected # of resulting stripes, null if not checked. 694 * @param size Expected target stripe size, null if not checked. 695 * @param start Left boundary of the compaction. 696 * @param end Right boundary of the compaction. 697 */ 698 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, 699 Collection<HStoreFile> sfs, Boolean dropDeletes, Integer count, Long size, byte[] start, 700 byte[] end, boolean needsCompaction) throws IOException { 701 StripeCompactor sc = mock(StripeCompactor.class); 702 assertTrue(!needsCompaction || policy.needsCompactions(si, al())); 703 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); 704 verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); 705 scr.execute(sc, NoLimitThroughputController.INSTANCE, null); 706 verify(sc, times(1)).compact(eq(scr.getRequest()), 707 count == null ? anyInt() : eq(count.intValue()), 708 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), 709 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), any(), any()); 710 } 711 712 /** Verify arbitrary flush. */ 713 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, 714 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { 715 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 716 StripeStoreFlusher.StripeFlushRequest req = 717 policy.selectFlush(CellComparatorImpl.COMPARATOR, si, input.length); 718 StripeMultiFileWriter mw = req.createWriter(); 719 mw.init(null, writers); 720 for (KeyValue kv : input) { 721 mw.append(kv); 722 } 723 boolean hasMetadata = boundaries != null; 724 mw.commitWriters(0, false); 725 writers.verifyKvs(expected, true, hasMetadata); 726 if (hasMetadata) { 727 writers.verifyBoundaries(boundaries); 728 } 729 } 730 731 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { 732 return dropDeletes == null 733 ? any() 734 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class)); 735 } 736 737 private void verifyCollectionsEqual(Collection<HStoreFile> sfs, Collection<HStoreFile> scr) { 738 // Dumb. 739 assertEquals(sfs.size(), scr.size()); 740 assertTrue(scr.containsAll(sfs)); 741 } 742 743 private static List<HStoreFile> getAllFiles(StripeInformationProvider si, int fromStripe, 744 int toStripe) { 745 ArrayList<HStoreFile> expected = new ArrayList<>(); 746 for (int i = fromStripe; i <= toStripe; ++i) { 747 expected.addAll(si.getStripes().get(i)); 748 } 749 return expected; 750 } 751 752 /** 753 * @param l0Count Number of L0 files. 754 * @param boundaries Target boundaries. 755 * @return Mock stripes. 756 */ 757 private static StripeInformationProvider createStripes(int l0Count, byte[]... boundaries) 758 throws Exception { 759 List<Long> l0Sizes = new ArrayList<>(); 760 for (int i = 0; i < l0Count; ++i) { 761 l0Sizes.add(5L); 762 } 763 List<List<Long>> sizes = new ArrayList<>(); 764 for (int i = 0; i <= boundaries.length; ++i) { 765 sizes.add(Arrays.asList(Long.valueOf(5))); 766 } 767 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes); 768 } 769 770 /** 771 * @param l0Count Number of L0 files. 772 * @param l0Size Size of each file. 773 * @return Mock stripes. 774 */ 775 private static StripeInformationProvider createStripesL0Only(int l0Count, long l0Size) 776 throws Exception { 777 List<Long> l0Sizes = new ArrayList<>(); 778 for (int i = 0; i < l0Count; ++i) { 779 l0Sizes.add(l0Size); 780 } 781 return createStripes(null, new ArrayList<>(), l0Sizes); 782 } 783 784 /** 785 * @param l0Count Number of L0 files. 786 * @param l0Size Size of each file. 787 * @param sizes Sizes of the files; each sub-array representing a stripe. 788 * @return Mock stripes. 789 */ 790 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 791 Long[]... sizes) throws Exception { 792 ArrayList<List<Long>> sizeList = new ArrayList<>(sizes.length); 793 for (Long[] size : sizes) { 794 sizeList.add(Arrays.asList(size)); 795 } 796 return createStripesWithSizes(l0Count, l0Size, sizeList); 797 } 798 799 private static StripeInformationProvider createStripesWithSizes(int l0Count, long l0Size, 800 List<List<Long>> sizes) throws Exception { 801 List<byte[]> boundaries = createBoundaries(sizes.size()); 802 List<Long> l0Sizes = new ArrayList<>(); 803 for (int i = 0; i < l0Count; ++i) { 804 l0Sizes.add(l0Size); 805 } 806 return createStripes(boundaries, sizes, l0Sizes); 807 } 808 809 private static List<byte[]> createBoundaries(int stripeCount) { 810 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E }; 811 assert stripeCount <= keys.length + 1; 812 List<byte[]> boundaries = new ArrayList<>(); 813 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1)); 814 return boundaries; 815 } 816 817 private static StripeInformationProvider createStripes(List<byte[]> boundaries, 818 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception { 819 List<List<HStoreFile>> stripeFiles = new ArrayList<>(stripeSizes.size()); 820 for (List<Long> sizes : stripeSizes) { 821 List<HStoreFile> sfs = new ArrayList<>(sizes.size()); 822 for (Long size : sizes) { 823 sfs.add(createFile(size)); 824 } 825 stripeFiles.add(sfs); 826 } 827 List<HStoreFile> l0Files = new ArrayList<>(); 828 for (Long size : l0Sizes) { 829 l0Files.add(createFile(size)); 830 } 831 return createStripesWithFiles(boundaries, stripeFiles, l0Files); 832 } 833 834 /** 835 * This method actually does all the work. 836 */ 837 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries, 838 List<List<HStoreFile>> stripeFiles, List<HStoreFile> l0Files) throws Exception { 839 ArrayList<ImmutableList<HStoreFile>> stripes = new ArrayList<>(); 840 ArrayList<byte[]> boundariesList = new ArrayList<>(); 841 StripeInformationProvider si = mock(StripeInformationProvider.class); 842 if (!stripeFiles.isEmpty()) { 843 assert stripeFiles.size() == (boundaries.size() + 1); 844 boundariesList.add(OPEN_KEY); 845 for (int i = 0; i <= boundaries.size(); ++i) { 846 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1)); 847 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i)); 848 boundariesList.add(endKey); 849 for (HStoreFile sf : stripeFiles.get(i)) { 850 setFileStripe(sf, startKey, endKey); 851 } 852 stripes.add(ImmutableList.copyOf(stripeFiles.get(i))); 853 when(si.getStartRow(eq(i))).thenReturn(startKey); 854 when(si.getEndRow(eq(i))).thenReturn(endKey); 855 } 856 } 857 ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>(); 858 sfs.addAllSublists(stripes); 859 sfs.addSublist(l0Files); 860 when(si.getStoreFiles()).thenReturn(sfs); 861 when(si.getStripes()).thenReturn(stripes); 862 when(si.getStripeBoundaries()).thenReturn(boundariesList); 863 when(si.getStripeCount()).thenReturn(stripes.size()); 864 when(si.getLevel0Files()).thenReturn(l0Files); 865 return si; 866 } 867 868 private static HStoreFile createFile(long size) throws Exception { 869 HStoreFile sf = mock(HStoreFile.class); 870 when(sf.getPath()).thenReturn(new Path("moo")); 871 StoreFileReader r = mock(StoreFileReader.class); 872 when(r.getEntries()).thenReturn(size); 873 when(r.length()).thenReturn(size); 874 when(r.getBloomFilterType()).thenReturn(BloomType.NONE); 875 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); 876 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), 877 anyBoolean())).thenReturn(mock(StoreFileScanner.class)); 878 when(sf.getReader()).thenReturn(r); 879 when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); 880 when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP); 881 return sf; 882 } 883 884 private static HStoreFile createFile() throws Exception { 885 return createFile(0); 886 } 887 888 private static void setFileStripe(HStoreFile sf, byte[] startKey, byte[] endKey) { 889 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); 890 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); 891 } 892 893 private StripeCompactor createCompactor() throws Exception { 894 ColumnFamilyDescriptor familyDescriptor = 895 ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("foo")); 896 StoreFileWritersCapture writers = new StoreFileWritersCapture(); 897 HStore store = mock(HStore.class); 898 RegionInfo info = mock(RegionInfo.class); 899 when(info.getRegionNameAsString()).thenReturn("testRegion"); 900 when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor); 901 when(store.getRegionInfo()).thenReturn(info); 902 StoreEngine storeEngine = mock(StoreEngine.class); 903 when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); 904 when(store.getStoreEngine()).thenReturn(storeEngine); 905 906 Configuration conf = HBaseConfiguration.create(); 907 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); 908 final Scanner scanner = new Scanner(); 909 return new StripeCompactor(conf, store) { 910 @Override 911 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 912 List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs, 913 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { 914 return scanner; 915 } 916 917 @Override 918 protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, 919 List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, 920 long earliestPutTs) throws IOException { 921 return scanner; 922 } 923 }; 924 } 925 926 private static class Scanner implements InternalScanner { 927 private final ArrayList<KeyValue> kvs; 928 929 public Scanner(KeyValue... kvs) { 930 this.kvs = new ArrayList<>(Arrays.asList(kvs)); 931 } 932 933 @Override 934 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 935 throws IOException { 936 if (kvs.isEmpty()) { 937 return false; 938 } 939 result.add(kvs.remove(0)); 940 return !kvs.isEmpty(); 941 } 942 943 @Override 944 public void close() throws IOException { 945 } 946 } 947}