001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ; 023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; 025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 027import static org.junit.Assert.assertArrayEquals; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertNull; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.spy; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.FileNotFoundException; 041import java.io.IOException; 042import java.lang.ref.SoftReference; 043import java.security.PrivilegedExceptionAction; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.Iterator; 049import java.util.List; 050import java.util.ListIterator; 051import java.util.NavigableSet; 052import java.util.Optional; 053import java.util.TreeSet; 054import java.util.concurrent.BrokenBarrierException; 055import java.util.concurrent.ConcurrentSkipListSet; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.CyclicBarrier; 058import java.util.concurrent.ExecutorService; 059import java.util.concurrent.Executors; 060import java.util.concurrent.Future; 061import java.util.concurrent.ThreadPoolExecutor; 062import java.util.concurrent.TimeUnit; 063import java.util.concurrent.atomic.AtomicBoolean; 064import java.util.concurrent.atomic.AtomicInteger; 065import java.util.concurrent.atomic.AtomicLong; 066import java.util.concurrent.atomic.AtomicReference; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.function.IntBinaryOperator; 070import org.apache.hadoop.conf.Configuration; 071import org.apache.hadoop.fs.FSDataOutputStream; 072import org.apache.hadoop.fs.FileStatus; 073import org.apache.hadoop.fs.FileSystem; 074import org.apache.hadoop.fs.FilterFileSystem; 075import org.apache.hadoop.fs.LocalFileSystem; 076import org.apache.hadoop.fs.Path; 077import org.apache.hadoop.fs.permission.FsPermission; 078import org.apache.hadoop.hbase.Cell; 079import org.apache.hadoop.hbase.CellBuilderFactory; 080import org.apache.hadoop.hbase.CellBuilderType; 081import org.apache.hadoop.hbase.CellComparator; 082import org.apache.hadoop.hbase.CellComparatorImpl; 083import org.apache.hadoop.hbase.CellUtil; 084import org.apache.hadoop.hbase.ExtendedCell; 085import org.apache.hadoop.hbase.HBaseClassTestRule; 086import org.apache.hadoop.hbase.HBaseConfiguration; 087import org.apache.hadoop.hbase.HBaseTestingUtility; 088import org.apache.hadoop.hbase.HConstants; 089import org.apache.hadoop.hbase.KeyValue; 090import org.apache.hadoop.hbase.MemoryCompactionPolicy; 091import org.apache.hadoop.hbase.NamespaceDescriptor; 092import org.apache.hadoop.hbase.PrivateCellUtil; 093import org.apache.hadoop.hbase.TableName; 094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 096import org.apache.hadoop.hbase.client.Get; 097import org.apache.hadoop.hbase.client.RegionInfo; 098import org.apache.hadoop.hbase.client.RegionInfoBuilder; 099import org.apache.hadoop.hbase.client.Scan; 100import org.apache.hadoop.hbase.client.Scan.ReadType; 101import org.apache.hadoop.hbase.client.TableDescriptor; 102import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 104import org.apache.hadoop.hbase.filter.Filter; 105import org.apache.hadoop.hbase.filter.FilterBase; 106import org.apache.hadoop.hbase.io.compress.Compression; 107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 108import org.apache.hadoop.hbase.io.hfile.CacheConfig; 109import org.apache.hadoop.hbase.io.hfile.HFile; 110import org.apache.hadoop.hbase.io.hfile.HFileContext; 111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 112import org.apache.hadoop.hbase.monitoring.MonitoredTask; 113import org.apache.hadoop.hbase.nio.RefCnt; 114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 122import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 123import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 124import org.apache.hadoop.hbase.security.User; 125import org.apache.hadoop.hbase.testclassification.MediumTests; 126import org.apache.hadoop.hbase.testclassification.RegionServerTests; 127import org.apache.hadoop.hbase.util.BloomFilterUtil; 128import org.apache.hadoop.hbase.util.Bytes; 129import org.apache.hadoop.hbase.util.CommonFSUtils; 130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 132import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 133import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 134import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 135import org.apache.hadoop.hbase.wal.WALFactory; 136import org.apache.hadoop.util.Progressable; 137import org.junit.After; 138import org.junit.AfterClass; 139import org.junit.Before; 140import org.junit.ClassRule; 141import org.junit.Rule; 142import org.junit.Test; 143import org.junit.experimental.categories.Category; 144import org.junit.rules.TestName; 145import org.mockito.Mockito; 146import org.slf4j.Logger; 147import org.slf4j.LoggerFactory; 148 149import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 150 151/** 152 * Test class for the HStore 153 */ 154@Category({ RegionServerTests.class, MediumTests.class }) 155public class TestHStore { 156 157 @ClassRule 158 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 159 160 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 161 @Rule 162 public TestName name = new TestName(); 163 164 HRegion region; 165 HStore store; 166 byte[] table = Bytes.toBytes("table"); 167 byte[] family = Bytes.toBytes("family"); 168 169 byte[] row = Bytes.toBytes("row"); 170 byte[] row2 = Bytes.toBytes("row2"); 171 byte[] qf1 = Bytes.toBytes("qf1"); 172 byte[] qf2 = Bytes.toBytes("qf2"); 173 byte[] qf3 = Bytes.toBytes("qf3"); 174 byte[] qf4 = Bytes.toBytes("qf4"); 175 byte[] qf5 = Bytes.toBytes("qf5"); 176 byte[] qf6 = Bytes.toBytes("qf6"); 177 178 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 179 180 List<Cell> expected = new ArrayList<>(); 181 List<Cell> result = new ArrayList<>(); 182 183 long id = EnvironmentEdgeManager.currentTime(); 184 Get get = new Get(row); 185 186 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 187 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 188 189 @Before 190 public void setUp() throws IOException { 191 qualifiers.clear(); 192 qualifiers.add(qf1); 193 qualifiers.add(qf3); 194 qualifiers.add(qf5); 195 196 Iterator<byte[]> iter = qualifiers.iterator(); 197 while (iter.hasNext()) { 198 byte[] next = iter.next(); 199 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 200 get.addColumn(family, next); 201 } 202 } 203 204 private void init(String methodName) throws IOException { 205 init(methodName, TEST_UTIL.getConfiguration()); 206 } 207 208 private HStore init(String methodName, Configuration conf) throws IOException { 209 // some of the tests write 4 versions and then flush 210 // (with HBASE-4241, lower versions are collected on flush) 211 return init(methodName, conf, 212 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 213 } 214 215 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 216 throws IOException { 217 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 218 } 219 220 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 221 ColumnFamilyDescriptor hcd) throws IOException { 222 return init(methodName, conf, builder, hcd, null); 223 } 224 225 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 226 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 227 return init(methodName, conf, builder, hcd, hook, false); 228 } 229 230 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 231 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 232 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 233 Path basedir = new Path(DIR + methodName); 234 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 235 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 236 237 FileSystem fs = FileSystem.get(conf); 238 239 fs.delete(logdir, true); 240 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 241 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 242 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 243 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 244 Configuration walConf = new Configuration(conf); 245 CommonFSUtils.setRootDir(walConf, basedir); 246 WALFactory wals = new WALFactory(walConf, methodName); 247 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 248 htd, null); 249 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 250 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 251 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 252 } 253 254 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 255 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 256 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 257 if (hook == null) { 258 store = new HStore(region, hcd, conf, false); 259 } else { 260 store = new MyStore(region, hcd, conf, hook, switchToPread); 261 } 262 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 263 return store; 264 } 265 266 /** 267 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 268 */ 269 @Test 270 public void testFlushSizeSizing() throws Exception { 271 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 272 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 273 // Only retry once. 274 conf.setInt("hbase.hstore.flush.retries.number", 1); 275 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 276 // Inject our faulty LocalFileSystem 277 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 278 user.runAs(new PrivilegedExceptionAction<Object>() { 279 @Override 280 public Object run() throws Exception { 281 // Make sure it worked (above is sensitive to caching details in hadoop core) 282 FileSystem fs = FileSystem.get(conf); 283 assertEquals(FaultyFileSystem.class, fs.getClass()); 284 FaultyFileSystem ffs = (FaultyFileSystem) fs; 285 286 // Initialize region 287 init(name.getMethodName(), conf); 288 289 MemStoreSize mss = store.memstore.getFlushableSize(); 290 assertEquals(0, mss.getDataSize()); 291 LOG.info("Adding some data"); 292 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 293 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 294 // add the heap size of active (mutable) segment 295 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 296 mss = store.memstore.getFlushableSize(); 297 assertEquals(kvSize.getMemStoreSize(), mss); 298 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 299 try { 300 LOG.info("Flushing"); 301 flushStore(store, id++); 302 fail("Didn't bubble up IOE!"); 303 } catch (IOException ioe) { 304 assertTrue(ioe.getMessage().contains("Fault injected")); 305 } 306 // due to snapshot, change mutable to immutable segment 307 kvSize.incMemStoreSize(0, 308 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 309 mss = store.memstore.getFlushableSize(); 310 assertEquals(kvSize.getMemStoreSize(), mss); 311 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 312 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 313 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 314 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 315 // not yet cleared the snapshot -- the above flush failed. 316 assertEquals(kvSize.getMemStoreSize(), mss); 317 ffs.fault.set(false); 318 flushStore(store, id++); 319 mss = store.memstore.getFlushableSize(); 320 // Size should be the foreground kv size. 321 assertEquals(kvSize2.getMemStoreSize(), mss); 322 flushStore(store, id++); 323 mss = store.memstore.getFlushableSize(); 324 assertEquals(0, mss.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 326 return null; 327 } 328 }); 329 } 330 331 @Test 332 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 333 int numStoreFiles = 5; 334 writeAndRead(BloomType.ROWCOL, numStoreFiles); 335 336 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 337 // hard to know exactly the numbers here, we are just trying to 338 // prove that they are incrementing 339 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 340 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 341 } 342 343 @Test 344 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 345 int numStoreFiles = 5; 346 writeAndRead(BloomType.ROWCOL, numStoreFiles); 347 348 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 349 // hard to know exactly the numbers here, we are just trying to 350 // prove that they are incrementing 351 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 352 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 353 } 354 355 @Test 356 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 357 int numStoreFiles = 5; 358 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 359 360 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 361 // hard to know exactly the numbers here, we are just trying to 362 // prove that they are incrementing 363 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 364 } 365 366 @Test 367 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 368 int numStoreFiles = 5; 369 writeAndRead(BloomType.NONE, numStoreFiles); 370 371 assertEquals(0, store.getBloomFilterRequestsCount()); 372 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 373 374 // hard to know exactly the numbers here, we are just trying to 375 // prove that they are incrementing 376 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 377 } 378 379 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 380 Configuration conf = HBaseConfiguration.create(); 381 FileSystem fs = FileSystem.get(conf); 382 383 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 384 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 385 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 386 init(name.getMethodName(), conf, hcd); 387 388 for (int i = 1; i <= numStoreFiles; i++) { 389 byte[] row = Bytes.toBytes("row" + i); 390 LOG.info("Adding some data for the store file #" + i); 391 long timeStamp = EnvironmentEdgeManager.currentTime(); 392 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 393 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 394 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 395 flush(i); 396 } 397 398 // Verify the total number of store files 399 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 400 401 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 402 columns.add(qf1); 403 404 for (int i = 1; i <= numStoreFiles; i++) { 405 KeyValueScanner scanner = 406 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 407 scanner.peek(); 408 } 409 } 410 411 /** 412 * Verify that compression and data block encoding are respected by the createWriter method, used 413 * on store flush. 414 */ 415 @Test 416 public void testCreateWriter() throws Exception { 417 Configuration conf = HBaseConfiguration.create(); 418 FileSystem fs = FileSystem.get(conf); 419 420 ColumnFamilyDescriptor hcd = 421 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 422 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 423 init(name.getMethodName(), conf, hcd); 424 425 // Test createWriter 426 StoreFileWriter writer = store.getStoreEngine() 427 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 428 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 429 .includesTag(false).shouldDropBehind(false)); 430 Path path = writer.getPath(); 431 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 432 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 433 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 434 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 435 writer.close(); 436 437 // Verify that compression and encoding settings are respected 438 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 439 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 440 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 441 reader.close(); 442 } 443 444 @Test 445 public void testDeleteExpiredStoreFiles() throws Exception { 446 testDeleteExpiredStoreFiles(0); 447 testDeleteExpiredStoreFiles(1); 448 } 449 450 /** 451 * @param minVersions the MIN_VERSIONS for the column family 452 */ 453 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 454 int storeFileNum = 4; 455 int ttl = 4; 456 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 457 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 458 459 Configuration conf = HBaseConfiguration.create(); 460 // Enable the expired store file deletion 461 conf.setBoolean("hbase.store.delete.expired.storefile", true); 462 // Set the compaction threshold higher to avoid normal compactions. 463 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 464 465 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 466 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 467 468 long storeTtl = this.store.getScanInfo().getTtl(); 469 long sleepTime = storeTtl / storeFileNum; 470 long timeStamp; 471 // There are 4 store files and the max time stamp difference among these 472 // store files will be (this.store.ttl / storeFileNum) 473 for (int i = 1; i <= storeFileNum; i++) { 474 LOG.info("Adding some data for the store file #" + i); 475 timeStamp = EnvironmentEdgeManager.currentTime(); 476 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 477 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 478 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 479 flush(i); 480 edge.incrementTime(sleepTime); 481 } 482 483 // Verify the total number of store files 484 assertEquals(storeFileNum, this.store.getStorefiles().size()); 485 486 // Each call will find one expired store file and delete it before compaction happens. 487 // There will be no compaction due to threshold above. Last file will not be replaced. 488 for (int i = 1; i <= storeFileNum - 1; i++) { 489 // verify the expired store file. 490 assertFalse(this.store.requestCompaction().isPresent()); 491 Collection<HStoreFile> sfs = this.store.getStorefiles(); 492 // Ensure i files are gone. 493 if (minVersions == 0) { 494 assertEquals(storeFileNum - i, sfs.size()); 495 // Ensure only non-expired files remain. 496 for (HStoreFile sf : sfs) { 497 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 498 } 499 } else { 500 assertEquals(storeFileNum, sfs.size()); 501 } 502 // Let the next store file expired. 503 edge.incrementTime(sleepTime); 504 } 505 assertFalse(this.store.requestCompaction().isPresent()); 506 507 Collection<HStoreFile> sfs = this.store.getStorefiles(); 508 // Assert the last expired file is not removed. 509 if (minVersions == 0) { 510 assertEquals(1, sfs.size()); 511 } 512 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 513 assertTrue(ts < (edge.currentTime() - storeTtl)); 514 515 for (HStoreFile sf : sfs) { 516 sf.closeStoreFile(true); 517 } 518 } 519 520 @Test 521 public void testLowestModificationTime() throws Exception { 522 Configuration conf = HBaseConfiguration.create(); 523 FileSystem fs = FileSystem.get(conf); 524 // Initialize region 525 init(name.getMethodName(), conf); 526 527 int storeFileNum = 4; 528 for (int i = 1; i <= storeFileNum; i++) { 529 LOG.info("Adding some data for the store file #" + i); 530 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 531 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 532 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 533 flush(i); 534 } 535 // after flush; check the lowest time stamp 536 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 537 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 538 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 539 540 // after compact; check the lowest time stamp 541 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 542 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 543 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 544 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 545 } 546 547 private static long getLowestTimeStampFromFS(FileSystem fs, 548 final Collection<HStoreFile> candidates) throws IOException { 549 long minTs = Long.MAX_VALUE; 550 if (candidates.isEmpty()) { 551 return minTs; 552 } 553 Path[] p = new Path[candidates.size()]; 554 int i = 0; 555 for (HStoreFile sf : candidates) { 556 p[i] = sf.getPath(); 557 ++i; 558 } 559 560 FileStatus[] stats = fs.listStatus(p); 561 if (stats == null || stats.length == 0) { 562 return minTs; 563 } 564 for (FileStatus s : stats) { 565 minTs = Math.min(minTs, s.getModificationTime()); 566 } 567 return minTs; 568 } 569 570 ////////////////////////////////////////////////////////////////////////////// 571 // Get tests 572 ////////////////////////////////////////////////////////////////////////////// 573 574 private static final int BLOCKSIZE_SMALL = 8192; 575 576 /** 577 * Test for hbase-1686. 578 */ 579 @Test 580 public void testEmptyStoreFile() throws IOException { 581 init(this.name.getMethodName()); 582 // Write a store file. 583 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 584 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 585 flush(1); 586 // Now put in place an empty store file. Its a little tricky. Have to 587 // do manually with hacked in sequence id. 588 HStoreFile f = this.store.getStorefiles().iterator().next(); 589 Path storedir = f.getPath().getParent(); 590 long seqid = f.getMaxSequenceId(); 591 Configuration c = HBaseConfiguration.create(); 592 FileSystem fs = FileSystem.get(c); 593 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 594 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 595 .withOutputDir(storedir).withFileContext(meta).build(); 596 w.appendMetadata(seqid + 1, false); 597 w.close(); 598 this.store.close(); 599 // Reopen it... should pick up two files 600 this.store = 601 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 602 assertEquals(2, this.store.getStorefilesCount()); 603 604 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 605 assertEquals(1, result.size()); 606 } 607 608 /** 609 * Getting data from memstore only 610 */ 611 @Test 612 public void testGet_FromMemStoreOnly() throws IOException { 613 init(this.name.getMethodName()); 614 615 // Put data in memstore 616 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 617 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 618 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 619 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 620 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 621 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 622 623 // Get 624 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 625 626 // Compare 627 assertCheck(); 628 } 629 630 @Test 631 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 632 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 633 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 634 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 635 } 636 637 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 638 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 639 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 640 long currentTs = 100; 641 long minTs = currentTs; 642 // the extra cell won't be flushed to disk, 643 // so the min of timerange will be different between memStore and hfile. 644 for (int i = 0; i != (maxVersion + 1); ++i) { 645 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 646 if (i == 1) { 647 minTs = currentTs; 648 } 649 } 650 flushStore(store, id++); 651 652 Collection<HStoreFile> files = store.getStorefiles(); 653 assertEquals(1, files.size()); 654 HStoreFile f = files.iterator().next(); 655 f.initReader(); 656 StoreFileReader reader = f.getReader(); 657 assertEquals(minTs, reader.timeRange.getMin()); 658 assertEquals(currentTs, reader.timeRange.getMax()); 659 } 660 661 /** 662 * Getting data from files only 663 */ 664 @Test 665 public void testGet_FromFilesOnly() throws IOException { 666 init(this.name.getMethodName()); 667 668 // Put data in memstore 669 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 670 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 671 // flush 672 flush(1); 673 674 // Add more data 675 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 676 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 677 // flush 678 flush(2); 679 680 // Add more data 681 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 682 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 683 // flush 684 flush(3); 685 686 // Get 687 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 688 // this.store.get(get, qualifiers, result); 689 690 // Need to sort the result since multiple files 691 Collections.sort(result, CellComparatorImpl.COMPARATOR); 692 693 // Compare 694 assertCheck(); 695 } 696 697 /** 698 * Getting data from memstore and files 699 */ 700 @Test 701 public void testGet_FromMemStoreAndFiles() throws IOException { 702 init(this.name.getMethodName()); 703 704 // Put data in memstore 705 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 706 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 707 // flush 708 flush(1); 709 710 // Add more data 711 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 712 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 713 // flush 714 flush(2); 715 716 // Add more data 717 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 718 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 719 720 // Get 721 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 722 723 // Need to sort the result since multiple files 724 Collections.sort(result, CellComparatorImpl.COMPARATOR); 725 726 // Compare 727 assertCheck(); 728 } 729 730 private void flush(int storeFilessize) throws IOException { 731 flushStore(store, id++); 732 assertEquals(storeFilessize, this.store.getStorefiles().size()); 733 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 734 } 735 736 private void assertCheck() { 737 assertEquals(expected.size(), result.size()); 738 for (int i = 0; i < expected.size(); i++) { 739 assertEquals(expected.get(i), result.get(i)); 740 } 741 } 742 743 @After 744 public void tearDown() throws Exception { 745 EnvironmentEdgeManagerTestHelper.reset(); 746 if (store != null) { 747 try { 748 store.close(); 749 } catch (IOException e) { 750 } 751 store = null; 752 } 753 if (region != null) { 754 region.close(); 755 region = null; 756 } 757 } 758 759 @AfterClass 760 public static void tearDownAfterClass() throws IOException { 761 TEST_UTIL.cleanupTestDir(); 762 } 763 764 @Test 765 public void testHandleErrorsInFlush() throws Exception { 766 LOG.info("Setting up a faulty file system that cannot write"); 767 768 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 769 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 770 // Inject our faulty LocalFileSystem 771 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 772 user.runAs(new PrivilegedExceptionAction<Object>() { 773 @Override 774 public Object run() throws Exception { 775 // Make sure it worked (above is sensitive to caching details in hadoop core) 776 FileSystem fs = FileSystem.get(conf); 777 assertEquals(FaultyFileSystem.class, fs.getClass()); 778 779 // Initialize region 780 init(name.getMethodName(), conf); 781 782 LOG.info("Adding some data"); 783 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 784 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 785 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 786 787 LOG.info("Before flush, we should have no files"); 788 789 Collection<StoreFileInfo> files = 790 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 791 assertEquals(0, files != null ? files.size() : 0); 792 793 // flush 794 try { 795 LOG.info("Flushing"); 796 flush(1); 797 fail("Didn't bubble up IOE!"); 798 } catch (IOException ioe) { 799 assertTrue(ioe.getMessage().contains("Fault injected")); 800 } 801 802 LOG.info("After failed flush, we should still have no files!"); 803 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 804 assertEquals(0, files != null ? files.size() : 0); 805 store.getHRegion().getWAL().close(); 806 return null; 807 } 808 }); 809 FileSystem.closeAllForUGI(user.getUGI()); 810 } 811 812 /** 813 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 814 * thereafter it will succeed. Used by {@link TestHRegion} too. 815 */ 816 static class FaultyFileSystem extends FilterFileSystem { 817 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 818 private long faultPos = 200; 819 AtomicBoolean fault = new AtomicBoolean(true); 820 821 public FaultyFileSystem() { 822 super(new LocalFileSystem()); 823 LOG.info("Creating faulty!"); 824 } 825 826 @Override 827 public FSDataOutputStream create(Path p) throws IOException { 828 return new FaultyOutputStream(super.create(p), faultPos, fault); 829 } 830 831 @Override 832 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 833 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 834 return new FaultyOutputStream( 835 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 836 faultPos, fault); 837 } 838 839 @Override 840 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 841 short replication, long blockSize, Progressable progress) throws IOException { 842 // Fake it. Call create instead. The default implementation throws an IOE 843 // that this is not supported. 844 return create(f, overwrite, bufferSize, replication, blockSize, progress); 845 } 846 } 847 848 static class FaultyOutputStream extends FSDataOutputStream { 849 volatile long faultPos = Long.MAX_VALUE; 850 private final AtomicBoolean fault; 851 852 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 853 throws IOException { 854 super(out, null); 855 this.faultPos = faultPos; 856 this.fault = fault; 857 } 858 859 @Override 860 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 861 LOG.info("faulty stream write at pos " + getPos()); 862 injectFault(); 863 super.write(buf, offset, length); 864 } 865 866 private void injectFault() throws IOException { 867 if (this.fault.get() && getPos() >= faultPos) { 868 throw new IOException("Fault injected"); 869 } 870 } 871 } 872 873 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 874 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 875 storeFlushCtx.prepare(); 876 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 877 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 878 return storeFlushCtx; 879 } 880 881 /** 882 * Generate a list of KeyValues for testing based on given parameters 883 * @return the rows key-value list 884 */ 885 private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 886 byte[] family) { 887 List<Cell> kvList = new ArrayList<>(); 888 for (int i = 1; i <= numRows; i++) { 889 byte[] b = Bytes.toBytes(i); 890 for (long timestamp : timestamps) { 891 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 892 } 893 } 894 return kvList; 895 } 896 897 /** 898 * Test to ensure correctness when using Stores with multiple timestamps 899 */ 900 @Test 901 public void testMultipleTimestamps() throws IOException { 902 int numRows = 1; 903 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 904 long[] timestamps2 = new long[] { 30, 80 }; 905 906 init(this.name.getMethodName()); 907 908 List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 909 for (Cell kv : kvList1) { 910 this.store.add(kv, null); 911 } 912 913 flushStore(store, id++); 914 915 List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 916 for (Cell kv : kvList2) { 917 this.store.add(kv, null); 918 } 919 920 List<Cell> result; 921 Get get = new Get(Bytes.toBytes(1)); 922 get.addColumn(family, qf1); 923 924 get.setTimeRange(0, 15); 925 result = HBaseTestingUtility.getFromStoreFile(store, get); 926 assertTrue(result.size() > 0); 927 928 get.setTimeRange(40, 90); 929 result = HBaseTestingUtility.getFromStoreFile(store, get); 930 assertTrue(result.size() > 0); 931 932 get.setTimeRange(10, 45); 933 result = HBaseTestingUtility.getFromStoreFile(store, get); 934 assertTrue(result.size() > 0); 935 936 get.setTimeRange(80, 145); 937 result = HBaseTestingUtility.getFromStoreFile(store, get); 938 assertTrue(result.size() > 0); 939 940 get.setTimeRange(1, 2); 941 result = HBaseTestingUtility.getFromStoreFile(store, get); 942 assertTrue(result.size() > 0); 943 944 get.setTimeRange(90, 200); 945 result = HBaseTestingUtility.getFromStoreFile(store, get); 946 assertTrue(result.size() == 0); 947 } 948 949 /** 950 * Test for HBASE-3492 - Test split on empty colfam (no store files). 951 * @throws IOException When the IO operations fail. 952 */ 953 @Test 954 public void testSplitWithEmptyColFam() throws IOException { 955 init(this.name.getMethodName()); 956 assertFalse(store.getSplitPoint().isPresent()); 957 } 958 959 @Test 960 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 961 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 962 long anyValue = 10; 963 964 // We'll check that it uses correct config and propagates it appropriately by going thru 965 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 966 // a number we pass in is higher than some config value, inside compactionPolicy. 967 Configuration conf = HBaseConfiguration.create(); 968 conf.setLong(CONFIG_KEY, anyValue); 969 init(name.getMethodName() + "-xml", conf); 970 assertTrue(store.throttleCompaction(anyValue + 1)); 971 assertFalse(store.throttleCompaction(anyValue)); 972 973 // HTD overrides XML. 974 --anyValue; 975 init( 976 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 977 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 978 ColumnFamilyDescriptorBuilder.of(family)); 979 assertTrue(store.throttleCompaction(anyValue + 1)); 980 assertFalse(store.throttleCompaction(anyValue)); 981 982 // HCD overrides them both. 983 --anyValue; 984 init(name.getMethodName() + "-hcd", conf, 985 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 986 Long.toString(anyValue)), 987 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 988 .build()); 989 assertTrue(store.throttleCompaction(anyValue + 1)); 990 assertFalse(store.throttleCompaction(anyValue)); 991 } 992 993 public static class DummyStoreEngine extends DefaultStoreEngine { 994 public static DefaultCompactor lastCreatedCompactor = null; 995 996 @Override 997 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 998 throws IOException { 999 super.createComponents(conf, store, comparator); 1000 lastCreatedCompactor = this.compactor; 1001 } 1002 } 1003 1004 @Test 1005 public void testStoreUsesSearchEngineOverride() throws Exception { 1006 Configuration conf = HBaseConfiguration.create(); 1007 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1008 init(this.name.getMethodName(), conf); 1009 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1010 } 1011 1012 private void addStoreFile() throws IOException { 1013 HStoreFile f = this.store.getStorefiles().iterator().next(); 1014 Path storedir = f.getPath().getParent(); 1015 long seqid = this.store.getMaxSequenceId().orElse(0L); 1016 Configuration c = TEST_UTIL.getConfiguration(); 1017 FileSystem fs = FileSystem.get(c); 1018 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1019 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1020 .withOutputDir(storedir).withFileContext(fileContext).build(); 1021 w.appendMetadata(seqid + 1, false); 1022 w.close(); 1023 LOG.info("Added store file:" + w.getPath()); 1024 } 1025 1026 private void archiveStoreFile(int index) throws IOException { 1027 Collection<HStoreFile> files = this.store.getStorefiles(); 1028 HStoreFile sf = null; 1029 Iterator<HStoreFile> it = files.iterator(); 1030 for (int i = 0; i <= index; i++) { 1031 sf = it.next(); 1032 } 1033 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 1034 Lists.newArrayList(sf)); 1035 } 1036 1037 private void closeCompactedFile(int index) throws IOException { 1038 Collection<HStoreFile> files = 1039 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1040 if (files.size() > 0) { 1041 HStoreFile sf = null; 1042 Iterator<HStoreFile> it = files.iterator(); 1043 for (int i = 0; i <= index; i++) { 1044 sf = it.next(); 1045 } 1046 sf.closeStoreFile(true); 1047 store.getStoreEngine().getStoreFileManager() 1048 .removeCompactedFiles(Collections.singletonList(sf)); 1049 } 1050 } 1051 1052 @Test 1053 public void testRefreshStoreFiles() throws Exception { 1054 init(name.getMethodName()); 1055 1056 assertEquals(0, this.store.getStorefilesCount()); 1057 1058 // Test refreshing store files when no store files are there 1059 store.refreshStoreFiles(); 1060 assertEquals(0, this.store.getStorefilesCount()); 1061 1062 // add some data, flush 1063 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1064 flush(1); 1065 assertEquals(1, this.store.getStorefilesCount()); 1066 1067 // add one more file 1068 addStoreFile(); 1069 1070 assertEquals(1, this.store.getStorefilesCount()); 1071 store.refreshStoreFiles(); 1072 assertEquals(2, this.store.getStorefilesCount()); 1073 1074 // add three more files 1075 addStoreFile(); 1076 addStoreFile(); 1077 addStoreFile(); 1078 1079 assertEquals(2, this.store.getStorefilesCount()); 1080 store.refreshStoreFiles(); 1081 assertEquals(5, this.store.getStorefilesCount()); 1082 1083 closeCompactedFile(0); 1084 archiveStoreFile(0); 1085 1086 assertEquals(5, this.store.getStorefilesCount()); 1087 store.refreshStoreFiles(); 1088 assertEquals(4, this.store.getStorefilesCount()); 1089 1090 archiveStoreFile(0); 1091 archiveStoreFile(1); 1092 archiveStoreFile(2); 1093 1094 assertEquals(4, this.store.getStorefilesCount()); 1095 store.refreshStoreFiles(); 1096 assertEquals(1, this.store.getStorefilesCount()); 1097 1098 archiveStoreFile(0); 1099 store.refreshStoreFiles(); 1100 assertEquals(0, this.store.getStorefilesCount()); 1101 } 1102 1103 @Test 1104 public void testRefreshStoreFilesNotChanged() throws IOException { 1105 init(name.getMethodName()); 1106 1107 assertEquals(0, this.store.getStorefilesCount()); 1108 1109 // add some data, flush 1110 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1111 flush(1); 1112 // add one more file 1113 addStoreFile(); 1114 1115 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1116 1117 // call first time after files changed 1118 spiedStoreEngine.refreshStoreFiles(); 1119 assertEquals(2, this.store.getStorefilesCount()); 1120 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1121 1122 // call second time 1123 spiedStoreEngine.refreshStoreFiles(); 1124 1125 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1126 // refreshed, 1127 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1128 } 1129 1130 @Test 1131 public void testScanWithCompactionAfterFlush() throws Exception { 1132 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1133 EverythingPolicy.class.getName()); 1134 init(name.getMethodName()); 1135 1136 assertEquals(0, this.store.getStorefilesCount()); 1137 1138 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1139 // add some data, flush 1140 this.store.add(kv, null); 1141 flush(1); 1142 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1143 // add some data, flush 1144 this.store.add(kv, null); 1145 flush(2); 1146 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1147 // add some data, flush 1148 this.store.add(kv, null); 1149 flush(3); 1150 1151 ExecutorService service = Executors.newFixedThreadPool(2); 1152 1153 Scan scan = new Scan(new Get(row)); 1154 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1155 try { 1156 LOG.info(">>>> creating scanner"); 1157 return this.store.createScanner(scan, 1158 new ScanInfo(HBaseConfiguration.create(), 1159 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1160 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1161 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1162 } catch (IOException e) { 1163 e.printStackTrace(); 1164 return null; 1165 } 1166 }); 1167 Future compactFuture = service.submit(() -> { 1168 try { 1169 LOG.info(">>>>>> starting compaction"); 1170 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1171 assertTrue(opCompaction.isPresent()); 1172 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1173 LOG.info(">>>>>> Compaction is finished"); 1174 this.store.closeAndArchiveCompactedFiles(); 1175 LOG.info(">>>>>> Compacted files deleted"); 1176 } catch (IOException e) { 1177 e.printStackTrace(); 1178 } 1179 }); 1180 1181 KeyValueScanner kvs = scanFuture.get(); 1182 compactFuture.get(); 1183 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1184 if (s instanceof StoreFileScanner) { 1185 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1186 } 1187 }); 1188 kvs.seek(kv); 1189 service.shutdownNow(); 1190 } 1191 1192 private long countMemStoreScanner(StoreScanner scanner) { 1193 if (scanner.currentScanners == null) { 1194 return 0; 1195 } 1196 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1197 } 1198 1199 @Test 1200 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1201 long seqId = 100; 1202 long timestamp = EnvironmentEdgeManager.currentTime(); 1203 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1204 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1205 PrivateCellUtil.setSequenceId(cell0, seqId); 1206 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1207 1208 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1209 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1210 PrivateCellUtil.setSequenceId(cell1, seqId); 1211 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1212 1213 seqId = 101; 1214 timestamp = EnvironmentEdgeManager.currentTime(); 1215 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1216 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1217 PrivateCellUtil.setSequenceId(cell2, seqId); 1218 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1219 } 1220 1221 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1222 List<Cell> inputCellsAfterSnapshot) throws IOException { 1223 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1224 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1225 long seqId = Long.MIN_VALUE; 1226 for (Cell c : inputCellsBeforeSnapshot) { 1227 quals.add(CellUtil.cloneQualifier(c)); 1228 seqId = Math.max(seqId, c.getSequenceId()); 1229 } 1230 for (Cell c : inputCellsAfterSnapshot) { 1231 quals.add(CellUtil.cloneQualifier(c)); 1232 seqId = Math.max(seqId, c.getSequenceId()); 1233 } 1234 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1235 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1236 storeFlushCtx.prepare(); 1237 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1238 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1239 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1240 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1241 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1242 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1243 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1244 // snapshot has no data after flush 1245 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1246 boolean more; 1247 int cellCount = 0; 1248 do { 1249 List<Cell> cells = new ArrayList<>(); 1250 more = s.next(cells); 1251 cellCount += cells.size(); 1252 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1253 } while (more); 1254 assertEquals( 1255 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1256 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1257 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1258 // the current scanners is cleared 1259 assertEquals(0, countMemStoreScanner(s)); 1260 } 1261 } 1262 1263 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1264 throws IOException { 1265 return createCell(row, qualifier, ts, sequenceId, value); 1266 } 1267 1268 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1269 throws IOException { 1270 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1271 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build(); 1272 PrivateCellUtil.setSequenceId(c, sequenceId); 1273 return c; 1274 } 1275 1276 @Test 1277 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1278 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1279 final int expectedSize = 3; 1280 testFlushBeforeCompletingScan(new MyListHook() { 1281 @Override 1282 public void hook(int currentSize) { 1283 if (currentSize == expectedSize - 1) { 1284 try { 1285 flushStore(store, id++); 1286 timeToGoNextRow.set(true); 1287 } catch (IOException e) { 1288 throw new RuntimeException(e); 1289 } 1290 } 1291 } 1292 }, new FilterBase() { 1293 @Override 1294 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1295 return ReturnCode.INCLUDE; 1296 } 1297 }, expectedSize); 1298 } 1299 1300 @Test 1301 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1302 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1303 final int expectedSize = 2; 1304 testFlushBeforeCompletingScan(new MyListHook() { 1305 @Override 1306 public void hook(int currentSize) { 1307 if (currentSize == expectedSize - 1) { 1308 try { 1309 flushStore(store, id++); 1310 timeToGoNextRow.set(true); 1311 } catch (IOException e) { 1312 throw new RuntimeException(e); 1313 } 1314 } 1315 } 1316 }, new FilterBase() { 1317 @Override 1318 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1319 if (timeToGoNextRow.get()) { 1320 timeToGoNextRow.set(false); 1321 return ReturnCode.NEXT_ROW; 1322 } else { 1323 return ReturnCode.INCLUDE; 1324 } 1325 } 1326 }, expectedSize); 1327 } 1328 1329 @Test 1330 public void testFlushBeforeCompletingScanWithFilterHint() 1331 throws IOException, InterruptedException { 1332 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1333 final int expectedSize = 2; 1334 testFlushBeforeCompletingScan(new MyListHook() { 1335 @Override 1336 public void hook(int currentSize) { 1337 if (currentSize == expectedSize - 1) { 1338 try { 1339 flushStore(store, id++); 1340 timeToGetHint.set(true); 1341 } catch (IOException e) { 1342 throw new RuntimeException(e); 1343 } 1344 } 1345 } 1346 }, new FilterBase() { 1347 @Override 1348 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1349 if (timeToGetHint.get()) { 1350 timeToGetHint.set(false); 1351 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1352 } else { 1353 return Filter.ReturnCode.INCLUDE; 1354 } 1355 } 1356 1357 @Override 1358 public Cell getNextCellHint(Cell currentCell) throws IOException { 1359 return currentCell; 1360 } 1361 }, expectedSize); 1362 } 1363 1364 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1365 throws IOException, InterruptedException { 1366 Configuration conf = HBaseConfiguration.create(); 1367 byte[] r0 = Bytes.toBytes("row0"); 1368 byte[] r1 = Bytes.toBytes("row1"); 1369 byte[] r2 = Bytes.toBytes("row2"); 1370 byte[] value0 = Bytes.toBytes("value0"); 1371 byte[] value1 = Bytes.toBytes("value1"); 1372 byte[] value2 = Bytes.toBytes("value2"); 1373 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1374 long ts = EnvironmentEdgeManager.currentTime(); 1375 long seqId = 100; 1376 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1377 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1378 new MyStoreHook() { 1379 @Override 1380 public long getSmallestReadPoint(HStore store) { 1381 return seqId + 3; 1382 } 1383 }); 1384 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1385 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1386 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1387 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1388 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1389 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1390 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1391 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1392 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1393 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1394 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1395 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1396 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1397 List<Cell> myList = new MyList<>(hook); 1398 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1399 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1400 // r1 1401 scanner.next(myList); 1402 assertEquals(expectedSize, myList.size()); 1403 for (Cell c : myList) { 1404 byte[] actualValue = CellUtil.cloneValue(c); 1405 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1406 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1407 } 1408 List<Cell> normalList = new ArrayList<>(3); 1409 // r2 1410 scanner.next(normalList); 1411 assertEquals(3, normalList.size()); 1412 for (Cell c : normalList) { 1413 byte[] actualValue = CellUtil.cloneValue(c); 1414 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1415 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1416 } 1417 } 1418 } 1419 1420 @Test 1421 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1422 Configuration conf = HBaseConfiguration.create(); 1423 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1424 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1425 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1426 byte[] value = Bytes.toBytes("value"); 1427 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1428 long ts = EnvironmentEdgeManager.currentTime(); 1429 long seqId = 100; 1430 // older data whihc shouldn't be "seen" by client 1431 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1432 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1433 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1434 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1435 quals.add(qf1); 1436 quals.add(qf2); 1437 quals.add(qf3); 1438 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1439 MyCompactingMemStore.START_TEST.set(true); 1440 Runnable flush = () -> { 1441 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1442 // recreate the active memstore -- phase (4/5) 1443 storeFlushCtx.prepare(); 1444 }; 1445 ExecutorService service = Executors.newSingleThreadExecutor(); 1446 service.execute(flush); 1447 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1448 // this is blocked until we recreate the active memstore -- phase (3/5) 1449 // we get scanner from active memstore but it is empty -- phase (5/5) 1450 InternalScanner scanner = 1451 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1452 service.shutdown(); 1453 service.awaitTermination(20, TimeUnit.SECONDS); 1454 try { 1455 try { 1456 List<Cell> results = new ArrayList<>(); 1457 scanner.next(results); 1458 assertEquals(3, results.size()); 1459 for (Cell c : results) { 1460 byte[] actualValue = CellUtil.cloneValue(c); 1461 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1462 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1463 } 1464 } finally { 1465 scanner.close(); 1466 } 1467 } finally { 1468 MyCompactingMemStore.START_TEST.set(false); 1469 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1470 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1471 } 1472 } 1473 1474 @Test 1475 public void testScanWithDoubleFlush() throws IOException { 1476 Configuration conf = HBaseConfiguration.create(); 1477 // Initialize region 1478 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1479 @Override 1480 public void getScanners(MyStore store) throws IOException { 1481 final long tmpId = id++; 1482 ExecutorService s = Executors.newSingleThreadExecutor(); 1483 s.execute(() -> { 1484 try { 1485 // flush the store before storescanner updates the scanners from store. 1486 // The current data will be flushed into files, and the memstore will 1487 // be clear. 1488 // -- phase (4/4) 1489 flushStore(store, tmpId); 1490 } catch (IOException ex) { 1491 throw new RuntimeException(ex); 1492 } 1493 }); 1494 s.shutdown(); 1495 try { 1496 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1497 s.awaitTermination(3, TimeUnit.SECONDS); 1498 } catch (InterruptedException ex) { 1499 } 1500 } 1501 }); 1502 byte[] oldValue = Bytes.toBytes("oldValue"); 1503 byte[] currentValue = Bytes.toBytes("currentValue"); 1504 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1505 long ts = EnvironmentEdgeManager.currentTime(); 1506 long seqId = 100; 1507 // older data whihc shouldn't be "seen" by client 1508 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1509 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1510 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1511 long snapshotId = id++; 1512 // push older data into snapshot -- phase (1/4) 1513 StoreFlushContext storeFlushCtx = 1514 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1515 storeFlushCtx.prepare(); 1516 1517 // insert current data into active -- phase (2/4) 1518 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1519 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1520 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1521 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1522 quals.add(qf1); 1523 quals.add(qf2); 1524 quals.add(qf3); 1525 try (InternalScanner scanner = 1526 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1527 // complete the flush -- phase (3/4) 1528 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1529 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1530 1531 List<Cell> results = new ArrayList<>(); 1532 scanner.next(results); 1533 assertEquals(3, results.size()); 1534 for (Cell c : results) { 1535 byte[] actualValue = CellUtil.cloneValue(c); 1536 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1537 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1538 } 1539 } 1540 } 1541 1542 /** 1543 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1544 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1545 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1546 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1547 */ 1548 @Test 1549 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1550 Configuration conf = HBaseConfiguration.create(); 1551 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1552 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1553 byte[] r0 = Bytes.toBytes("row0"); 1554 byte[] r1 = Bytes.toBytes("row1"); 1555 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1556 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1557 // Initialize region 1558 final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1559 @Override 1560 public void getScanners(MyStore store) throws IOException { 1561 try { 1562 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1563 // following TestHStore.flushStore 1564 if (shouldWaitRef.get()) { 1565 // wait the following compaction Task start 1566 cyclicBarrier.await(); 1567 // wait the following HStore.closeAndArchiveCompactedFiles end. 1568 cyclicBarrier.await(); 1569 } 1570 } catch (BrokenBarrierException | InterruptedException e) { 1571 throw new RuntimeException(e); 1572 } 1573 } 1574 }); 1575 1576 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1577 Runnable compactionTask = () -> { 1578 try { 1579 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1580 // entering the MyStore.getScanners, compactionTask could start. 1581 cyclicBarrier.await(); 1582 region.compactStore(family, new NoLimitThroughputController()); 1583 myStore.closeAndArchiveCompactedFiles(); 1584 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1585 cyclicBarrier.await(); 1586 } catch (Throwable e) { 1587 compactionExceptionRef.set(e); 1588 } 1589 }; 1590 1591 long ts = EnvironmentEdgeManager.currentTime(); 1592 long seqId = 100; 1593 byte[] value = Bytes.toBytes("value"); 1594 // older data whihc shouldn't be "seen" by client 1595 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1596 flushStore(myStore, id++); 1597 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1598 flushStore(myStore, id++); 1599 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1600 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1601 quals.add(qf1); 1602 quals.add(qf2); 1603 quals.add(qf3); 1604 1605 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1606 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1607 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1608 1609 Thread.currentThread() 1610 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1611 Scan scan = new Scan(); 1612 scan.withStartRow(r0, true); 1613 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1614 List<Cell> results = new MyList<>(size -> { 1615 switch (size) { 1616 case 1: 1617 shouldWaitRef.set(true); 1618 Thread thread = new Thread(compactionTask); 1619 thread.setName("MyCompacting Thread."); 1620 thread.start(); 1621 try { 1622 flushStore(myStore, id++); 1623 thread.join(); 1624 } catch (IOException | InterruptedException e) { 1625 throw new RuntimeException(e); 1626 } 1627 shouldWaitRef.set(false); 1628 break; 1629 default: 1630 break; 1631 } 1632 }); 1633 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1634 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1635 scanner.next(results); 1636 // The results is r0 row cells. 1637 assertEquals(3, results.size()); 1638 assertTrue(compactionExceptionRef.get() == null); 1639 } 1640 } 1641 1642 @Test 1643 public void testReclaimChunkWhenScaning() throws IOException { 1644 init("testReclaimChunkWhenScaning"); 1645 long ts = EnvironmentEdgeManager.currentTime(); 1646 long seqId = 100; 1647 byte[] value = Bytes.toBytes("value"); 1648 // older data whihc shouldn't be "seen" by client 1649 store.add(createCell(qf1, ts, seqId, value), null); 1650 store.add(createCell(qf2, ts, seqId, value), null); 1651 store.add(createCell(qf3, ts, seqId, value), null); 1652 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1653 quals.add(qf1); 1654 quals.add(qf2); 1655 quals.add(qf3); 1656 try (InternalScanner scanner = 1657 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1658 List<Cell> results = new MyList<>(size -> { 1659 switch (size) { 1660 // 1) we get the first cell (qf1) 1661 // 2) flush the data to have StoreScanner update inner scanners 1662 // 3) the chunk will be reclaimed after updaing 1663 case 1: 1664 try { 1665 flushStore(store, id++); 1666 } catch (IOException e) { 1667 throw new RuntimeException(e); 1668 } 1669 break; 1670 // 1) we get the second cell (qf2) 1671 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1672 case 2: 1673 try { 1674 byte[] newValue = Bytes.toBytes("newValue"); 1675 // older data whihc shouldn't be "seen" by client 1676 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1677 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1678 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1679 } catch (IOException e) { 1680 throw new RuntimeException(e); 1681 } 1682 break; 1683 default: 1684 break; 1685 } 1686 }); 1687 scanner.next(results); 1688 assertEquals(3, results.size()); 1689 for (Cell c : results) { 1690 byte[] actualValue = CellUtil.cloneValue(c); 1691 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1692 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1693 } 1694 } 1695 } 1696 1697 /** 1698 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1699 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1700 * the corresponding segments. In short, there will be some segements which isn't in merge are 1701 * removed. 1702 */ 1703 @Test 1704 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1705 int flushSize = 500; 1706 Configuration conf = HBaseConfiguration.create(); 1707 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1708 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1709 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1710 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1711 // Set the lower threshold to invoke the "MERGE" policy 1712 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1713 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1714 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1715 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1716 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1717 long ts = EnvironmentEdgeManager.currentTime(); 1718 long seqId = 100; 1719 // older data whihc shouldn't be "seen" by client 1720 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1721 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1722 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1723 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1724 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1725 storeFlushCtx.prepare(); 1726 // This shouldn't invoke another in-memory flush because the first compactor thread 1727 // hasn't accomplished the in-memory compaction. 1728 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1729 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1730 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1731 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1732 // okay. Let the compaction be completed 1733 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1734 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1735 while (mem.isMemStoreFlushingInMemory()) { 1736 TimeUnit.SECONDS.sleep(1); 1737 } 1738 // This should invoke another in-memory flush. 1739 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1740 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1741 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1742 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1743 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1744 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1745 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1746 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1747 } 1748 1749 @Test 1750 public void testAge() throws IOException { 1751 long currentTime = EnvironmentEdgeManager.currentTime(); 1752 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1753 edge.setValue(currentTime); 1754 EnvironmentEdgeManager.injectEdge(edge); 1755 Configuration conf = TEST_UTIL.getConfiguration(); 1756 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1757 initHRegion(name.getMethodName(), conf, 1758 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1759 HStore store = new HStore(region, hcd, conf, false) { 1760 1761 @Override 1762 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1763 CellComparator kvComparator) throws IOException { 1764 List<HStoreFile> storefiles = 1765 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1766 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1767 StoreFileManager sfm = mock(StoreFileManager.class); 1768 when(sfm.getStorefiles()).thenReturn(storefiles); 1769 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1770 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1771 return storeEngine; 1772 } 1773 }; 1774 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1775 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1776 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1777 } 1778 1779 private HStoreFile mockStoreFile(long createdTime) { 1780 StoreFileInfo info = mock(StoreFileInfo.class); 1781 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1782 HStoreFile sf = mock(HStoreFile.class); 1783 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1784 when(sf.isHFile()).thenReturn(true); 1785 when(sf.getFileInfo()).thenReturn(info); 1786 return sf; 1787 } 1788 1789 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1790 throws IOException { 1791 return (MyStore) init(methodName, conf, 1792 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1793 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1794 } 1795 1796 private static class MyStore extends HStore { 1797 private final MyStoreHook hook; 1798 1799 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1800 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1801 super(region, family, confParam, false); 1802 this.hook = hook; 1803 } 1804 1805 @Override 1806 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1807 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1808 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1809 boolean includeMemstoreScanner) throws IOException { 1810 hook.getScanners(this); 1811 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1812 stopRow, false, readPt, includeMemstoreScanner); 1813 } 1814 1815 @Override 1816 public long getSmallestReadPoint() { 1817 return hook.getSmallestReadPoint(this); 1818 } 1819 } 1820 1821 private abstract static class MyStoreHook { 1822 1823 void getScanners(MyStore store) throws IOException { 1824 } 1825 1826 long getSmallestReadPoint(HStore store) { 1827 return store.getHRegion().getSmallestReadPoint(); 1828 } 1829 } 1830 1831 @Test 1832 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1833 Configuration conf = HBaseConfiguration.create(); 1834 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1835 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1836 // Set the lower threshold to invoke the "MERGE" policy 1837 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1838 }); 1839 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1840 long ts = EnvironmentEdgeManager.currentTime(); 1841 long seqID = 1L; 1842 // Add some data to the region and do some flushes 1843 for (int i = 1; i < 10; i++) { 1844 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1845 memStoreSizing); 1846 } 1847 // flush them 1848 flushStore(store, seqID); 1849 for (int i = 11; i < 20; i++) { 1850 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1851 memStoreSizing); 1852 } 1853 // flush them 1854 flushStore(store, seqID); 1855 for (int i = 21; i < 30; i++) { 1856 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1857 memStoreSizing); 1858 } 1859 // flush them 1860 flushStore(store, seqID); 1861 1862 assertEquals(3, store.getStorefilesCount()); 1863 Scan scan = new Scan(); 1864 scan.addFamily(family); 1865 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1866 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1867 StoreScanner storeScanner = 1868 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1869 // get the current heap 1870 KeyValueHeap heap = storeScanner.heap; 1871 // create more store files 1872 for (int i = 31; i < 40; i++) { 1873 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1874 memStoreSizing); 1875 } 1876 // flush them 1877 flushStore(store, seqID); 1878 1879 for (int i = 41; i < 50; i++) { 1880 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1881 memStoreSizing); 1882 } 1883 // flush them 1884 flushStore(store, seqID); 1885 storefiles2 = store.getStorefiles(); 1886 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1887 actualStorefiles1.removeAll(actualStorefiles); 1888 // Do compaction 1889 MyThread thread = new MyThread(storeScanner); 1890 thread.start(); 1891 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1892 thread.join(); 1893 KeyValueHeap heap2 = thread.getHeap(); 1894 assertFalse(heap.equals(heap2)); 1895 } 1896 1897 @Test 1898 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1899 Configuration conf = HBaseConfiguration.create(); 1900 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1901 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1902 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1903 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1904 }); 1905 Scan scan = new Scan(); 1906 scan.addFamily(family); 1907 // ReadType on Scan is still DEFAULT only. 1908 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1909 StoreScanner storeScanner = 1910 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1911 assertFalse(storeScanner.isScanUsePread()); 1912 } 1913 1914 @Test 1915 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1916 Configuration conf = HBaseConfiguration.create(); 1917 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1918 init(name.getMethodName(), conf, 1919 TableDescriptorBuilder.newBuilder( 1920 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1921 ColumnFamilyDescriptorBuilder.newBuilder(family) 1922 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1923 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1924 .startsWith("eager".toUpperCase())); 1925 } 1926 1927 @Test 1928 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1929 final TableName tn = TableName.valueOf(name.getMethodName()); 1930 init(name.getMethodName()); 1931 1932 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1933 1934 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1935 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1936 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1937 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1938 1939 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1940 .setEndKey(Bytes.toBytes("b")).build(); 1941 1942 // Compacting two files down to one, reducing size 1943 sizeStore.put(regionInfo, 1024L + 4096L); 1944 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1945 Arrays.asList(sf2)); 1946 1947 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1948 1949 // The same file length in and out should have no change 1950 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1951 Arrays.asList(sf2)); 1952 1953 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1954 1955 // Increase the total size used 1956 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1957 Arrays.asList(sf3)); 1958 1959 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1960 1961 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1962 .setEndKey(Bytes.toBytes("c")).build(); 1963 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1964 1965 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1966 } 1967 1968 @Test 1969 public void testHFileContextSetWithCFAndTable() throws Exception { 1970 init(this.name.getMethodName()); 1971 StoreFileWriter writer = store.getStoreEngine() 1972 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1973 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1974 .includesTag(false).shouldDropBehind(true)); 1975 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1976 assertArrayEquals(family, hFileContext.getColumnFamily()); 1977 assertArrayEquals(table, hFileContext.getTableName()); 1978 } 1979 1980 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1981 // but its dataSize exceeds inmemoryFlushSize 1982 @Test 1983 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1984 throws IOException, InterruptedException { 1985 Configuration conf = HBaseConfiguration.create(); 1986 1987 byte[] smallValue = new byte[3]; 1988 byte[] largeValue = new byte[9]; 1989 final long timestamp = EnvironmentEdgeManager.currentTime(); 1990 final long seqId = 100; 1991 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1992 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1993 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1994 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1995 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 1996 1997 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 1998 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 1999 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2000 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2001 2002 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2003 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2004 2005 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 2006 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2007 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 2008 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 2009 2010 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2011 Thread smallCellThread = new Thread(() -> { 2012 try { 2013 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2014 } catch (Throwable exception) { 2015 exceptionRef.set(exception); 2016 } 2017 }); 2018 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 2019 smallCellThread.start(); 2020 2021 String oldThreadName = Thread.currentThread().getName(); 2022 try { 2023 /** 2024 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2025 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2026 * invokes flushInMemory. 2027 * <p/> 2028 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2029 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2030 * method, CompactingMemStore.active has no cell. 2031 */ 2032 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2033 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2034 smallCellThread.join(); 2035 2036 for (int i = 0; i < 100; i++) { 2037 long currentTimestamp = timestamp + 100 + i; 2038 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2039 store.add(cell, new NonThreadSafeMemStoreSizing()); 2040 } 2041 } finally { 2042 Thread.currentThread().setName(oldThreadName); 2043 } 2044 2045 assertTrue(exceptionRef.get() == null); 2046 2047 } 2048 2049 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2050 // InmemoryFlushSize 2051 @Test(timeout = 60000) 2052 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2053 Configuration conf = HBaseConfiguration.create(); 2054 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2055 2056 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2057 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2058 2059 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2060 2061 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2062 byte[] value = new byte[size + 1]; 2063 2064 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2065 long timestamp = EnvironmentEdgeManager.currentTime(); 2066 long seqId = 100; 2067 Cell cell = createCell(qf1, timestamp, seqId, value); 2068 int cellByteSize = MutableSegment.getCellLength(cell); 2069 store.add(cell, memStoreSizing); 2070 assertTrue(memStoreSizing.getCellsCount() == 1); 2071 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2072 // Waiting the in memory compaction completed, see HBASE-26438 2073 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2074 } 2075 2076 /** 2077 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2078 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2079 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2080 * segment replace, and we may read wrong data when these chunk reused by others. 2081 */ 2082 @Test 2083 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2084 Configuration conf = HBaseConfiguration.create(); 2085 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2086 2087 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2088 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2089 final long timestamp = EnvironmentEdgeManager.currentTime(); 2090 final long seqId = 100; 2091 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2092 final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2093 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2094 final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2095 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2096 quals.add(qf1); 2097 2098 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2099 int inMemoryFlushByteSize = cellByteSize - 1; 2100 2101 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2102 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2103 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2104 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2105 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2106 2107 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2108 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2109 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2110 2111 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2112 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2113 2114 // Data chunk Pool is disabled. 2115 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2116 2117 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2118 2119 // First compact 2120 store.add(originalCell1, memStoreSizing); 2121 // Waiting for the first in-memory compaction finished 2122 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2123 2124 StoreScanner storeScanner = 2125 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2126 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2127 Cell resultCell1 = segmentScanner.next(); 2128 assertTrue(CellUtil.equals(resultCell1, originalCell1)); 2129 int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId(); 2130 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2131 assertNull(segmentScanner.next()); 2132 segmentScanner.close(); 2133 storeScanner.close(); 2134 Segment segment = segmentScanner.segment; 2135 assertTrue(segment instanceof CellChunkImmutableSegment); 2136 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2137 assertTrue(!memStoreLAB1.isClosed()); 2138 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2139 assertTrue(!memStoreLAB1.isReclaimed()); 2140 2141 // Second compact 2142 store.add(originalCell2, memStoreSizing); 2143 // Waiting for the second in-memory compaction finished 2144 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2145 2146 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2147 // must be associated with chunk.. We were looking for a cell at index 0. 2148 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2149 // are recycled after the second in-memory compaction finished,the 2150 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2151 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2152 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2153 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2154 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2155 Cell newResultCell1 = segmentScanner.next(); 2156 assertTrue(newResultCell1 != resultCell1); 2157 assertTrue(CellUtil.equals(newResultCell1, originalCell1)); 2158 2159 Cell resultCell2 = segmentScanner.next(); 2160 assertTrue(CellUtil.equals(resultCell2, originalCell2)); 2161 assertNull(segmentScanner.next()); 2162 segmentScanner.close(); 2163 storeScanner.close(); 2164 2165 segment = segmentScanner.segment; 2166 assertTrue(segment instanceof CellChunkImmutableSegment); 2167 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2168 assertTrue(!memStoreLAB2.isClosed()); 2169 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2170 assertTrue(!memStoreLAB2.isReclaimed()); 2171 assertTrue(memStoreLAB1.isClosed()); 2172 assertTrue(memStoreLAB1.chunks.isEmpty()); 2173 assertTrue(memStoreLAB1.isReclaimed()); 2174 } 2175 2176 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2177 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2178 @Test 2179 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2180 throws IOException, InterruptedException { 2181 doWriteTestLargeCellAndSmallCellConcurrently( 2182 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2183 doWriteTestLargeCellAndSmallCellConcurrently( 2184 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2185 doWriteTestLargeCellAndSmallCellConcurrently( 2186 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2187 doWriteTestLargeCellAndSmallCellConcurrently( 2188 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2189 doWriteTestLargeCellAndSmallCellConcurrently( 2190 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2191 } 2192 2193 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2194 throws IOException, InterruptedException { 2195 2196 Configuration conf = HBaseConfiguration.create(); 2197 2198 byte[] smallValue = new byte[3]; 2199 byte[] largeValue = new byte[100]; 2200 final long timestamp = EnvironmentEdgeManager.currentTime(); 2201 final long seqId = 100; 2202 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2203 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2204 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2205 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2206 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2207 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2208 flushByteSize < (smallCellByteSize + largeCellByteSize); 2209 2210 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2211 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2212 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2213 2214 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2215 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2216 2217 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2218 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2219 myCompactingMemStore.disableCompaction(); 2220 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2221 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2222 } else { 2223 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2224 } 2225 2226 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2227 final AtomicLong totalCellByteSize = new AtomicLong(0); 2228 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2229 Thread smallCellThread = new Thread(() -> { 2230 try { 2231 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2232 long currentTimestamp = timestamp + i; 2233 Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2234 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2235 store.add(cell, memStoreSizing); 2236 } 2237 } catch (Throwable exception) { 2238 exceptionRef.set(exception); 2239 2240 } 2241 }); 2242 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2243 smallCellThread.start(); 2244 2245 String oldThreadName = Thread.currentThread().getName(); 2246 try { 2247 /** 2248 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2249 * </p> 2250 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2251 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2252 * largeCellThread invokes flushInMemory. 2253 * <p/> 2254 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2255 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2256 * <p/> 2257 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2258 * largeCellThread concurrently write one cell and wait each other, and then write another 2259 * cell etc. 2260 */ 2261 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2262 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2263 long currentTimestamp = timestamp + i; 2264 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2265 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2266 store.add(cell, memStoreSizing); 2267 } 2268 smallCellThread.join(); 2269 2270 assertTrue(exceptionRef.get() == null); 2271 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2272 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2273 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2274 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2275 } else { 2276 assertTrue( 2277 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2278 } 2279 } finally { 2280 Thread.currentThread().setName(oldThreadName); 2281 } 2282 } 2283 2284 /** 2285 * <pre> 2286 * This test is for HBASE-26384, 2287 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2288 * execute concurrently. 2289 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2290 * for both branch-2 and master): 2291 * 1. The {@link CompactingMemStore} size exceeds 2292 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2293 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2294 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2295 * 2. The in memory compact thread starts and then stopping before 2296 * {@link CompactingMemStore#flattenOneSegment}. 2297 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2298 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2299 * compact thread continues. 2300 * Assuming {@link VersionedSegmentsList#version} returned from 2301 * {@link CompactingMemStore#getImmutableSegments} is v. 2302 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2303 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2304 * {@link CompactionPipeline#version} is still v. 2305 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2306 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2307 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2308 * {@link CompactionPipeline} has changed because 2309 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2310 * removed in fact and still remaining in {@link CompactionPipeline}. 2311 * 2312 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2313 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2314 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2315 * v+1. 2316 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2317 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2318 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2319 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2320 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2321 * </pre> 2322 */ 2323 @Test 2324 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2325 Configuration conf = HBaseConfiguration.create(); 2326 2327 byte[] smallValue = new byte[3]; 2328 byte[] largeValue = new byte[9]; 2329 final long timestamp = EnvironmentEdgeManager.currentTime(); 2330 final long seqId = 100; 2331 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2332 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2333 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2334 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2335 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2336 int flushByteSize = totalCellByteSize - 2; 2337 2338 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2339 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2340 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2341 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2342 2343 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2344 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2345 2346 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2347 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2348 2349 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2350 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2351 2352 String oldThreadName = Thread.currentThread().getName(); 2353 try { 2354 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2355 /** 2356 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2357 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2358 * would invoke {@link CompactingMemStore#stopCompaction}. 2359 */ 2360 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2361 2362 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2363 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2364 2365 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2366 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2367 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2368 assertTrue(segments.getNumOfSegments() == 0); 2369 assertTrue(segments.getNumOfCells() == 0); 2370 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2371 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2372 } finally { 2373 Thread.currentThread().setName(oldThreadName); 2374 } 2375 } 2376 2377 /** 2378 * <pre> 2379 * This test is for HBASE-26384, 2380 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2381 * and writeMemStore execute concurrently. 2382 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2383 * for both branch-2 and master): 2384 * 1. The {@link CompactingMemStore} size exceeds 2385 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2386 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2387 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2388 * 2. The in memory compact thread starts and then stopping before 2389 * {@link CompactingMemStore#flattenOneSegment}. 2390 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2391 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2392 * compact thread continues. 2393 * Assuming {@link VersionedSegmentsList#version} returned from 2394 * {@link CompactingMemStore#getImmutableSegments} is v. 2395 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2396 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2397 * {@link CompactionPipeline#version} is still v. 2398 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2399 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2400 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2401 * {@link CompactionPipeline} has changed because 2402 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2403 * removed in fact and still remaining in {@link CompactionPipeline}. 2404 * 2405 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2406 * and I add step 7-8 to test there is new segment added before retry. 2407 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2408 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2409 * v+1. 2410 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2411 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2412 * failed and retry,{@link VersionedSegmentsList#version} returned from 2413 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2414 * 7. The write thread continues writing to {@link CompactingMemStore} and 2415 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2416 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2417 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2418 * {@link CompactionPipeline#version} is still v+1. 2419 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2420 * {@link CompactionPipeline#version} is still v+1, 2421 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2422 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2423 * {@link CompactingMemStore#swapPipelineWithNull}. 2424 * </pre> 2425 */ 2426 @Test 2427 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2428 Configuration conf = HBaseConfiguration.create(); 2429 2430 byte[] smallValue = new byte[3]; 2431 byte[] largeValue = new byte[9]; 2432 final long timestamp = EnvironmentEdgeManager.currentTime(); 2433 final long seqId = 100; 2434 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2435 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2436 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2437 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2438 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2439 int flushByteSize = firstWriteCellByteSize - 2; 2440 2441 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2442 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2443 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2444 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2445 2446 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2447 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2448 2449 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2450 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2451 2452 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2453 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2454 2455 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2456 final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2457 final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2458 final int writeAgainCellByteSize = 2459 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2460 final Thread writeAgainThread = new Thread(() -> { 2461 try { 2462 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2463 2464 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2465 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2466 2467 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2468 } catch (Throwable exception) { 2469 exceptionRef.set(exception); 2470 } 2471 }); 2472 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2473 writeAgainThread.start(); 2474 2475 String oldThreadName = Thread.currentThread().getName(); 2476 try { 2477 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2478 /** 2479 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2480 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2481 * would invoke {@link CompactingMemStore#stopCompaction}. 2482 */ 2483 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2484 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2485 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2486 writeAgainThread.join(); 2487 2488 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2489 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2490 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2491 assertTrue(segments.getNumOfSegments() == 1); 2492 assertTrue( 2493 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2494 assertTrue(segments.getNumOfCells() == 2); 2495 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2496 assertTrue(exceptionRef.get() == null); 2497 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2498 } finally { 2499 Thread.currentThread().setName(oldThreadName); 2500 } 2501 } 2502 2503 /** 2504 * <pre> 2505 * This test is for HBASE-26465, 2506 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2507 * concurrently. The threads sequence before HBASE-26465 is: 2508 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2509 * {@link DefaultMemStore}. 2510 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2511 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2512 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2513 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2514 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2515 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2516 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2517 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2518 * are recycled. 2519 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2520 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2521 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2522 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2523 * be overwritten by other write threads,which may cause serious problem. 2524 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2525 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2526 * </pre> 2527 */ 2528 @Test 2529 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2530 Configuration conf = HBaseConfiguration.create(); 2531 2532 byte[] smallValue = new byte[3]; 2533 byte[] largeValue = new byte[9]; 2534 final long timestamp = EnvironmentEdgeManager.currentTime(); 2535 final long seqId = 100; 2536 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2537 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2538 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2539 quals.add(qf1); 2540 quals.add(qf2); 2541 2542 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2543 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2544 2545 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2546 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2547 myDefaultMemStore.store = store; 2548 2549 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2550 store.add(smallCell, memStoreSizing); 2551 store.add(largeCell, memStoreSizing); 2552 2553 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2554 final Thread flushThread = new Thread(() -> { 2555 try { 2556 flushStore(store, id++); 2557 } catch (Throwable exception) { 2558 exceptionRef.set(exception); 2559 } 2560 }); 2561 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2562 flushThread.start(); 2563 2564 String oldThreadName = Thread.currentThread().getName(); 2565 StoreScanner storeScanner = null; 2566 try { 2567 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2568 2569 /** 2570 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2571 */ 2572 myDefaultMemStore.getScannerCyclicBarrier.await(); 2573 2574 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2575 flushThread.join(); 2576 2577 if (myDefaultMemStore.shouldWait) { 2578 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2579 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2580 assertTrue(memStoreLAB.isClosed()); 2581 assertTrue(!memStoreLAB.chunks.isEmpty()); 2582 assertTrue(!memStoreLAB.isReclaimed()); 2583 2584 Cell cell1 = segmentScanner.next(); 2585 CellUtil.equals(smallCell, cell1); 2586 Cell cell2 = segmentScanner.next(); 2587 CellUtil.equals(largeCell, cell2); 2588 assertNull(segmentScanner.next()); 2589 } else { 2590 List<Cell> results = new ArrayList<>(); 2591 storeScanner.next(results); 2592 assertEquals(2, results.size()); 2593 CellUtil.equals(smallCell, results.get(0)); 2594 CellUtil.equals(largeCell, results.get(1)); 2595 } 2596 assertTrue(exceptionRef.get() == null); 2597 } finally { 2598 if (storeScanner != null) { 2599 storeScanner.close(); 2600 } 2601 Thread.currentThread().setName(oldThreadName); 2602 } 2603 } 2604 2605 @SuppressWarnings("unchecked") 2606 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2607 List<T> resultScanners = new ArrayList<T>(); 2608 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2609 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2610 resultScanners.add((T) keyValueScanner); 2611 } 2612 } 2613 assertTrue(resultScanners.size() == 1); 2614 return resultScanners.get(0); 2615 } 2616 2617 @Test 2618 public void testOnConfigurationChange() throws IOException { 2619 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2620 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2621 final int STORE_MAX_FILES_TO_COMPACT = 6; 2622 2623 // Build a table that its maxFileToCompact different from common configuration. 2624 Configuration conf = HBaseConfiguration.create(); 2625 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2626 COMMON_MAX_FILES_TO_COMPACT); 2627 conf.setBoolean(CACHE_DATA_ON_READ_KEY, false); 2628 conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); 2629 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 2630 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2631 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2632 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2633 .build(); 2634 init(this.name.getMethodName(), conf, hcd); 2635 2636 // After updating common configuration, the conf in HStore itself must not be changed. 2637 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2638 NEW_COMMON_MAX_FILES_TO_COMPACT); 2639 this.store.onConfigurationChange(conf); 2640 2641 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2642 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2643 2644 assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false); 2645 assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true); 2646 assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true); 2647 2648 // reset to default values 2649 conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ); 2650 conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); 2651 conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); 2652 this.store.onConfigurationChange(conf); 2653 } 2654 2655 /** 2656 * This test is for HBASE-26476 2657 */ 2658 @Test 2659 public void testExtendsDefaultMemStore() throws Exception { 2660 Configuration conf = HBaseConfiguration.create(); 2661 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2662 2663 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2664 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2665 tearDown(); 2666 2667 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2668 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2669 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2670 } 2671 2672 static class CustomDefaultMemStore extends DefaultMemStore { 2673 2674 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2675 RegionServicesForStores regionServices) { 2676 super(conf, c, regionServices); 2677 } 2678 2679 } 2680 2681 /** 2682 * This test is for HBASE-26488 2683 */ 2684 @Test 2685 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2686 2687 Configuration conf = HBaseConfiguration.create(); 2688 2689 byte[] smallValue = new byte[3]; 2690 byte[] largeValue = new byte[9]; 2691 final long timestamp = EnvironmentEdgeManager.currentTime(); 2692 final long seqId = 100; 2693 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2694 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2695 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2696 quals.add(qf1); 2697 quals.add(qf2); 2698 2699 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2700 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2701 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2702 MyDefaultStoreFlusher.class.getName()); 2703 2704 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2705 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2706 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2707 2708 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2709 store.add(smallCell, memStoreSizing); 2710 store.add(largeCell, memStoreSizing); 2711 flushStore(store, id++); 2712 2713 MemStoreLABImpl memStoreLAB = 2714 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2715 assertTrue(memStoreLAB.isClosed()); 2716 assertTrue(memStoreLAB.getRefCntValue() == 0); 2717 assertTrue(memStoreLAB.isReclaimed()); 2718 assertTrue(memStoreLAB.chunks.isEmpty()); 2719 StoreScanner storeScanner = null; 2720 try { 2721 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2722 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2723 assertTrue(store.memstore.size().getCellsCount() == 0); 2724 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2725 assertTrue(storeScanner.currentScanners.size() == 1); 2726 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2727 2728 List<Cell> results = new ArrayList<>(); 2729 storeScanner.next(results); 2730 assertEquals(2, results.size()); 2731 CellUtil.equals(smallCell, results.get(0)); 2732 CellUtil.equals(largeCell, results.get(1)); 2733 } finally { 2734 if (storeScanner != null) { 2735 storeScanner.close(); 2736 } 2737 } 2738 } 2739 2740 static class MyDefaultMemStore1 extends DefaultMemStore { 2741 2742 private ImmutableSegment snapshotImmutableSegment; 2743 2744 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2745 RegionServicesForStores regionServices) { 2746 super(conf, c, regionServices); 2747 } 2748 2749 @Override 2750 public MemStoreSnapshot snapshot() { 2751 MemStoreSnapshot result = super.snapshot(); 2752 this.snapshotImmutableSegment = snapshot; 2753 return result; 2754 } 2755 2756 } 2757 2758 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2759 private static final AtomicInteger failCounter = new AtomicInteger(1); 2760 private static final AtomicInteger counter = new AtomicInteger(0); 2761 2762 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2763 super(conf, store); 2764 } 2765 2766 @Override 2767 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2768 MonitoredTask status, ThroughputController throughputController, 2769 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2770 counter.incrementAndGet(); 2771 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2772 writerCreationTracker); 2773 } 2774 2775 @Override 2776 protected void performFlush(InternalScanner scanner, final CellSink sink, 2777 ThroughputController throughputController) throws IOException { 2778 2779 final int currentCount = counter.get(); 2780 CellSink newCellSink = (cell) -> { 2781 if (currentCount <= failCounter.get()) { 2782 throw new IOException("Simulated exception by tests"); 2783 } 2784 sink.append(cell); 2785 }; 2786 super.performFlush(scanner, newCellSink, throughputController); 2787 } 2788 } 2789 2790 /** 2791 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2792 */ 2793 @Test 2794 public void testImmutableMemStoreLABRefCnt() throws Exception { 2795 Configuration conf = HBaseConfiguration.create(); 2796 2797 byte[] smallValue = new byte[3]; 2798 byte[] largeValue = new byte[9]; 2799 final long timestamp = EnvironmentEdgeManager.currentTime(); 2800 final long seqId = 100; 2801 final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2802 final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2803 final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2804 final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2805 final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2806 final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2807 2808 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2809 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2810 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2811 int flushByteSize = firstWriteCellByteSize - 2; 2812 2813 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2814 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2815 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2816 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2817 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2818 2819 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2820 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2821 2822 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2823 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2824 myCompactingMemStore.allowCompaction.set(false); 2825 2826 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2827 store.add(smallCell1, memStoreSizing); 2828 store.add(largeCell1, memStoreSizing); 2829 store.add(smallCell2, memStoreSizing); 2830 store.add(largeCell2, memStoreSizing); 2831 store.add(smallCell3, memStoreSizing); 2832 store.add(largeCell3, memStoreSizing); 2833 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2834 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2835 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2836 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2837 for (ImmutableSegment segment : segments) { 2838 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2839 } 2840 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2841 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2842 assertTrue(memStoreLAB.getRefCntValue() == 2); 2843 } 2844 2845 myCompactingMemStore.allowCompaction.set(true); 2846 myCompactingMemStore.flushInMemory(); 2847 2848 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2849 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2850 ImmutableMemStoreLAB immutableMemStoreLAB = 2851 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2852 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2853 assertTrue(memStoreLAB.getRefCntValue() == 2); 2854 } 2855 2856 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2857 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2858 assertTrue(memStoreLAB.getRefCntValue() == 2); 2859 } 2860 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2861 for (KeyValueScanner scanner : scanners1) { 2862 scanner.close(); 2863 } 2864 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2865 assertTrue(memStoreLAB.getRefCntValue() == 1); 2866 } 2867 for (KeyValueScanner scanner : scanners2) { 2868 scanner.close(); 2869 } 2870 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2871 assertTrue(memStoreLAB.getRefCntValue() == 1); 2872 } 2873 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2874 flushStore(store, id++); 2875 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2876 assertTrue(memStoreLAB.getRefCntValue() == 0); 2877 } 2878 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2879 assertTrue(immutableMemStoreLAB.isClosed()); 2880 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2881 assertTrue(memStoreLAB.isClosed()); 2882 assertTrue(memStoreLAB.isReclaimed()); 2883 assertTrue(memStoreLAB.chunks.isEmpty()); 2884 } 2885 } 2886 2887 private HStoreFile mockStoreFileWithLength(long length) { 2888 HStoreFile sf = mock(HStoreFile.class); 2889 StoreFileReader sfr = mock(StoreFileReader.class); 2890 when(sf.isHFile()).thenReturn(true); 2891 when(sf.getReader()).thenReturn(sfr); 2892 when(sfr.length()).thenReturn(length); 2893 return sf; 2894 } 2895 2896 private static class MyThread extends Thread { 2897 private StoreScanner scanner; 2898 private KeyValueHeap heap; 2899 2900 public MyThread(StoreScanner scanner) { 2901 this.scanner = scanner; 2902 } 2903 2904 public KeyValueHeap getHeap() { 2905 return this.heap; 2906 } 2907 2908 @Override 2909 public void run() { 2910 scanner.trySwitchToStreamRead(); 2911 heap = scanner.heap; 2912 } 2913 } 2914 2915 private static class MyMemStoreCompactor extends MemStoreCompactor { 2916 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2917 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2918 2919 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2920 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2921 super(compactingMemStore, compactionPolicy); 2922 } 2923 2924 @Override 2925 public boolean start() throws IOException { 2926 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2927 if (isFirst) { 2928 try { 2929 START_COMPACTOR_LATCH.await(); 2930 return super.start(); 2931 } catch (InterruptedException ex) { 2932 throw new RuntimeException(ex); 2933 } 2934 } 2935 return super.start(); 2936 } 2937 } 2938 2939 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2940 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2941 2942 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2943 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2944 throws IOException { 2945 super(conf, c, store, regionServices, compactionPolicy); 2946 } 2947 2948 @Override 2949 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2950 throws IllegalArgumentIOException { 2951 return new MyMemStoreCompactor(this, compactionPolicy); 2952 } 2953 2954 @Override 2955 protected boolean setInMemoryCompactionFlag() { 2956 boolean rval = super.setInMemoryCompactionFlag(); 2957 if (rval) { 2958 RUNNER_COUNT.incrementAndGet(); 2959 if (LOG.isDebugEnabled()) { 2960 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2961 } 2962 } 2963 return rval; 2964 } 2965 } 2966 2967 public static class MyCompactingMemStore extends CompactingMemStore { 2968 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2969 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2970 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2971 2972 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2973 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2974 throws IOException { 2975 super(conf, c, store, regionServices, compactionPolicy); 2976 } 2977 2978 @Override 2979 protected List<KeyValueScanner> createList(int capacity) { 2980 if (START_TEST.get()) { 2981 try { 2982 getScannerLatch.countDown(); 2983 snapshotLatch.await(); 2984 } catch (InterruptedException e) { 2985 throw new RuntimeException(e); 2986 } 2987 } 2988 return new ArrayList<>(capacity); 2989 } 2990 2991 @Override 2992 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2993 if (START_TEST.get()) { 2994 try { 2995 getScannerLatch.await(); 2996 } catch (InterruptedException e) { 2997 throw new RuntimeException(e); 2998 } 2999 } 3000 3001 super.pushActiveToPipeline(active, checkEmpty); 3002 if (START_TEST.get()) { 3003 snapshotLatch.countDown(); 3004 } 3005 } 3006 } 3007 3008 interface MyListHook { 3009 void hook(int currentSize); 3010 } 3011 3012 private static class MyList<T> implements List<T> { 3013 private final List<T> delegatee = new ArrayList<>(); 3014 private final MyListHook hookAtAdd; 3015 3016 MyList(final MyListHook hookAtAdd) { 3017 this.hookAtAdd = hookAtAdd; 3018 } 3019 3020 @Override 3021 public int size() { 3022 return delegatee.size(); 3023 } 3024 3025 @Override 3026 public boolean isEmpty() { 3027 return delegatee.isEmpty(); 3028 } 3029 3030 @Override 3031 public boolean contains(Object o) { 3032 return delegatee.contains(o); 3033 } 3034 3035 @Override 3036 public Iterator<T> iterator() { 3037 return delegatee.iterator(); 3038 } 3039 3040 @Override 3041 public Object[] toArray() { 3042 return delegatee.toArray(); 3043 } 3044 3045 @Override 3046 public <R> R[] toArray(R[] a) { 3047 return delegatee.toArray(a); 3048 } 3049 3050 @Override 3051 public boolean add(T e) { 3052 hookAtAdd.hook(size()); 3053 return delegatee.add(e); 3054 } 3055 3056 @Override 3057 public boolean remove(Object o) { 3058 return delegatee.remove(o); 3059 } 3060 3061 @Override 3062 public boolean containsAll(Collection<?> c) { 3063 return delegatee.containsAll(c); 3064 } 3065 3066 @Override 3067 public boolean addAll(Collection<? extends T> c) { 3068 return delegatee.addAll(c); 3069 } 3070 3071 @Override 3072 public boolean addAll(int index, Collection<? extends T> c) { 3073 return delegatee.addAll(index, c); 3074 } 3075 3076 @Override 3077 public boolean removeAll(Collection<?> c) { 3078 return delegatee.removeAll(c); 3079 } 3080 3081 @Override 3082 public boolean retainAll(Collection<?> c) { 3083 return delegatee.retainAll(c); 3084 } 3085 3086 @Override 3087 public void clear() { 3088 delegatee.clear(); 3089 } 3090 3091 @Override 3092 public T get(int index) { 3093 return delegatee.get(index); 3094 } 3095 3096 @Override 3097 public T set(int index, T element) { 3098 return delegatee.set(index, element); 3099 } 3100 3101 @Override 3102 public void add(int index, T element) { 3103 delegatee.add(index, element); 3104 } 3105 3106 @Override 3107 public T remove(int index) { 3108 return delegatee.remove(index); 3109 } 3110 3111 @Override 3112 public int indexOf(Object o) { 3113 return delegatee.indexOf(o); 3114 } 3115 3116 @Override 3117 public int lastIndexOf(Object o) { 3118 return delegatee.lastIndexOf(o); 3119 } 3120 3121 @Override 3122 public ListIterator<T> listIterator() { 3123 return delegatee.listIterator(); 3124 } 3125 3126 @Override 3127 public ListIterator<T> listIterator(int index) { 3128 return delegatee.listIterator(index); 3129 } 3130 3131 @Override 3132 public List<T> subList(int fromIndex, int toIndex) { 3133 return delegatee.subList(fromIndex, toIndex); 3134 } 3135 } 3136 3137 public static class MyCompactingMemStore2 extends CompactingMemStore { 3138 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3139 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3140 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3141 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3142 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3143 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3144 3145 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3146 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3147 throws IOException { 3148 super(conf, cellComparator, store, regionServices, compactionPolicy); 3149 } 3150 3151 @Override 3152 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3153 MemStoreSizing memstoreSizing) { 3154 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3155 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3156 if (currentCount <= 1) { 3157 try { 3158 /** 3159 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3160 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3161 * largeCellThread invokes flushInMemory. 3162 */ 3163 preCyclicBarrier.await(); 3164 } catch (Throwable e) { 3165 throw new RuntimeException(e); 3166 } 3167 } 3168 } 3169 3170 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3171 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3172 try { 3173 preCyclicBarrier.await(); 3174 } catch (Throwable e) { 3175 throw new RuntimeException(e); 3176 } 3177 } 3178 return returnValue; 3179 } 3180 3181 @Override 3182 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 3183 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3184 try { 3185 /** 3186 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3187 * currentActive . That is to say when largeCellThread called flushInMemory method, 3188 * currentActive has no cell. 3189 */ 3190 postCyclicBarrier.await(); 3191 } catch (Throwable e) { 3192 throw new RuntimeException(e); 3193 } 3194 } 3195 super.doAdd(currentActive, cell, memstoreSizing); 3196 } 3197 3198 @Override 3199 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3200 super.flushInMemory(currentActiveMutableSegment); 3201 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3202 if (largeCellPreUpdateCounter.get() <= 1) { 3203 try { 3204 postCyclicBarrier.await(); 3205 } catch (Throwable e) { 3206 throw new RuntimeException(e); 3207 } 3208 } 3209 } 3210 } 3211 3212 } 3213 3214 public static class MyCompactingMemStore3 extends CompactingMemStore { 3215 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3216 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3217 3218 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3219 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3220 private final AtomicInteger flushCounter = new AtomicInteger(0); 3221 private static final int CELL_COUNT = 5; 3222 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3223 3224 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3225 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3226 throws IOException { 3227 super(conf, cellComparator, store, regionServices, compactionPolicy); 3228 } 3229 3230 @Override 3231 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3232 MemStoreSizing memstoreSizing) { 3233 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3234 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3235 } 3236 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3237 try { 3238 preCyclicBarrier.await(); 3239 } catch (Throwable e) { 3240 throw new RuntimeException(e); 3241 } 3242 } 3243 3244 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3245 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3246 try { 3247 preCyclicBarrier.await(); 3248 } catch (Throwable e) { 3249 throw new RuntimeException(e); 3250 } 3251 } 3252 return returnValue; 3253 } 3254 3255 @Override 3256 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3257 super.postUpdate(currentActiveMutableSegment); 3258 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3259 try { 3260 postCyclicBarrier.await(); 3261 } catch (Throwable e) { 3262 throw new RuntimeException(e); 3263 } 3264 return; 3265 } 3266 3267 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3268 try { 3269 postCyclicBarrier.await(); 3270 } catch (Throwable e) { 3271 throw new RuntimeException(e); 3272 } 3273 } 3274 } 3275 3276 @Override 3277 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3278 super.flushInMemory(currentActiveMutableSegment); 3279 flushCounter.incrementAndGet(); 3280 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3281 return; 3282 } 3283 3284 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3285 try { 3286 postCyclicBarrier.await(); 3287 } catch (Throwable e) { 3288 throw new RuntimeException(e); 3289 } 3290 3291 } 3292 3293 void disableCompaction() { 3294 allowCompaction.set(false); 3295 } 3296 3297 void enableCompaction() { 3298 allowCompaction.set(true); 3299 } 3300 3301 } 3302 3303 public static class MyCompactingMemStore4 extends CompactingMemStore { 3304 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3305 /** 3306 * {@link CompactingMemStore#flattenOneSegment} must execute after 3307 * {@link CompactingMemStore#getImmutableSegments} 3308 */ 3309 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3310 /** 3311 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3312 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3313 */ 3314 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3315 /** 3316 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3317 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3318 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3319 */ 3320 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3321 /** 3322 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3323 */ 3324 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3325 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3326 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3327 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3328 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3329 3330 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3331 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3332 throws IOException { 3333 super(conf, cellComparator, store, regionServices, compactionPolicy); 3334 } 3335 3336 @Override 3337 public VersionedSegmentsList getImmutableSegments() { 3338 VersionedSegmentsList result = super.getImmutableSegments(); 3339 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3340 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3341 if (currentCount <= 1) { 3342 try { 3343 flattenOneSegmentPreCyclicBarrier.await(); 3344 } catch (Throwable e) { 3345 throw new RuntimeException(e); 3346 } 3347 } 3348 } 3349 return result; 3350 } 3351 3352 @Override 3353 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3354 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3355 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3356 if (currentCount <= 1) { 3357 try { 3358 flattenOneSegmentPostCyclicBarrier.await(); 3359 } catch (Throwable e) { 3360 throw new RuntimeException(e); 3361 } 3362 } 3363 } 3364 boolean result = super.swapPipelineWithNull(segments); 3365 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3366 int currentCount = swapPipelineWithNullCounter.get(); 3367 if (currentCount <= 1) { 3368 assertTrue(!result); 3369 } 3370 if (currentCount == 2) { 3371 assertTrue(result); 3372 } 3373 } 3374 return result; 3375 3376 } 3377 3378 @Override 3379 public void flattenOneSegment(long requesterVersion, Action action) { 3380 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3381 if (currentCount <= 1) { 3382 try { 3383 /** 3384 * {@link CompactingMemStore#snapshot} could start. 3385 */ 3386 snapShotStartCyclicCyclicBarrier.await(); 3387 flattenOneSegmentPreCyclicBarrier.await(); 3388 } catch (Throwable e) { 3389 throw new RuntimeException(e); 3390 } 3391 } 3392 super.flattenOneSegment(requesterVersion, action); 3393 if (currentCount <= 1) { 3394 try { 3395 flattenOneSegmentPostCyclicBarrier.await(); 3396 } catch (Throwable e) { 3397 throw new RuntimeException(e); 3398 } 3399 } 3400 } 3401 3402 @Override 3403 protected boolean setInMemoryCompactionFlag() { 3404 boolean result = super.setInMemoryCompactionFlag(); 3405 assertTrue(result); 3406 setInMemoryCompactionFlagCounter.incrementAndGet(); 3407 return result; 3408 } 3409 3410 @Override 3411 void inMemoryCompaction() { 3412 try { 3413 super.inMemoryCompaction(); 3414 } finally { 3415 try { 3416 inMemoryCompactionEndCyclicBarrier.await(); 3417 } catch (Throwable e) { 3418 throw new RuntimeException(e); 3419 } 3420 3421 } 3422 } 3423 3424 } 3425 3426 public static class MyCompactingMemStore5 extends CompactingMemStore { 3427 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3428 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3429 /** 3430 * {@link CompactingMemStore#flattenOneSegment} must execute after 3431 * {@link CompactingMemStore#getImmutableSegments} 3432 */ 3433 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3434 /** 3435 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3436 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3437 */ 3438 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3439 /** 3440 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3441 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3442 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3443 */ 3444 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3445 /** 3446 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3447 */ 3448 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3449 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3450 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3451 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3452 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3453 /** 3454 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3455 * thread could start. 3456 */ 3457 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3458 /** 3459 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3460 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3461 * execute,and in memory compact thread would exit,because we expect that in memory compact 3462 * executing only once. 3463 */ 3464 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3465 3466 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3467 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3468 throws IOException { 3469 super(conf, cellComparator, store, regionServices, compactionPolicy); 3470 } 3471 3472 @Override 3473 public VersionedSegmentsList getImmutableSegments() { 3474 VersionedSegmentsList result = super.getImmutableSegments(); 3475 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3476 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3477 if (currentCount <= 1) { 3478 try { 3479 flattenOneSegmentPreCyclicBarrier.await(); 3480 } catch (Throwable e) { 3481 throw new RuntimeException(e); 3482 } 3483 } 3484 3485 } 3486 3487 return result; 3488 } 3489 3490 @Override 3491 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3492 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3493 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3494 if (currentCount <= 1) { 3495 try { 3496 flattenOneSegmentPostCyclicBarrier.await(); 3497 } catch (Throwable e) { 3498 throw new RuntimeException(e); 3499 } 3500 } 3501 3502 if (currentCount == 2) { 3503 try { 3504 /** 3505 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3506 * writeAgain thread could start. 3507 */ 3508 writeMemStoreAgainStartCyclicBarrier.await(); 3509 /** 3510 * Only the writeAgain thread completes, retry 3511 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3512 */ 3513 writeMemStoreAgainEndCyclicBarrier.await(); 3514 } catch (Throwable e) { 3515 throw new RuntimeException(e); 3516 } 3517 } 3518 3519 } 3520 boolean result = super.swapPipelineWithNull(segments); 3521 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3522 int currentCount = swapPipelineWithNullCounter.get(); 3523 if (currentCount <= 1) { 3524 assertTrue(!result); 3525 } 3526 if (currentCount == 2) { 3527 assertTrue(result); 3528 } 3529 } 3530 return result; 3531 3532 } 3533 3534 @Override 3535 public void flattenOneSegment(long requesterVersion, Action action) { 3536 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3537 if (currentCount <= 1) { 3538 try { 3539 /** 3540 * {@link CompactingMemStore#snapshot} could start. 3541 */ 3542 snapShotStartCyclicCyclicBarrier.await(); 3543 flattenOneSegmentPreCyclicBarrier.await(); 3544 } catch (Throwable e) { 3545 throw new RuntimeException(e); 3546 } 3547 } 3548 super.flattenOneSegment(requesterVersion, action); 3549 if (currentCount <= 1) { 3550 try { 3551 flattenOneSegmentPostCyclicBarrier.await(); 3552 /** 3553 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3554 * expect that in memory compact executing only once. 3555 */ 3556 writeMemStoreAgainEndCyclicBarrier.await(); 3557 } catch (Throwable e) { 3558 throw new RuntimeException(e); 3559 } 3560 3561 } 3562 } 3563 3564 @Override 3565 protected boolean setInMemoryCompactionFlag() { 3566 boolean result = super.setInMemoryCompactionFlag(); 3567 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3568 if (count <= 1) { 3569 assertTrue(result); 3570 } 3571 if (count == 2) { 3572 assertTrue(!result); 3573 } 3574 return result; 3575 } 3576 3577 @Override 3578 void inMemoryCompaction() { 3579 try { 3580 super.inMemoryCompaction(); 3581 } finally { 3582 try { 3583 inMemoryCompactionEndCyclicBarrier.await(); 3584 } catch (Throwable e) { 3585 throw new RuntimeException(e); 3586 } 3587 3588 } 3589 } 3590 } 3591 3592 public static class MyCompactingMemStore6 extends CompactingMemStore { 3593 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3594 3595 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3596 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3597 throws IOException { 3598 super(conf, cellComparator, store, regionServices, compactionPolicy); 3599 } 3600 3601 @Override 3602 void inMemoryCompaction() { 3603 try { 3604 super.inMemoryCompaction(); 3605 } finally { 3606 try { 3607 inMemoryCompactionEndCyclicBarrier.await(); 3608 } catch (Throwable e) { 3609 throw new RuntimeException(e); 3610 } 3611 3612 } 3613 } 3614 } 3615 3616 public static class MyDefaultMemStore extends DefaultMemStore { 3617 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3618 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3619 /** 3620 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3621 * could start. 3622 */ 3623 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3624 /** 3625 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3626 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3627 */ 3628 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3629 /** 3630 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3631 * completed, {@link DefaultMemStore#getScanners} could continue. 3632 */ 3633 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3634 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3635 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3636 private volatile boolean shouldWait = true; 3637 private volatile HStore store = null; 3638 3639 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3640 RegionServicesForStores regionServices) throws IOException { 3641 super(conf, cellComparator, regionServices); 3642 } 3643 3644 @Override 3645 protected List<Segment> getSnapshotSegments() { 3646 3647 List<Segment> result = super.getSnapshotSegments(); 3648 3649 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3650 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3651 if (currentCount == 1) { 3652 if (this.shouldWait) { 3653 try { 3654 /** 3655 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3656 * {@link DefaultMemStore#doClearSnapShot} could continue. 3657 */ 3658 preClearSnapShotCyclicBarrier.await(); 3659 /** 3660 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3661 */ 3662 postClearSnapShotCyclicBarrier.await(); 3663 3664 } catch (Throwable e) { 3665 throw new RuntimeException(e); 3666 } 3667 } 3668 } 3669 } 3670 return result; 3671 } 3672 3673 @Override 3674 protected void doClearSnapShot() { 3675 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3676 int currentCount = clearSnapshotCounter.incrementAndGet(); 3677 if (currentCount == 1) { 3678 try { 3679 if ( 3680 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3681 .isWriteLockedByCurrentThread() 3682 ) { 3683 shouldWait = false; 3684 } 3685 /** 3686 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3687 * thread could start. 3688 */ 3689 getScannerCyclicBarrier.await(); 3690 3691 if (shouldWait) { 3692 /** 3693 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3694 */ 3695 preClearSnapShotCyclicBarrier.await(); 3696 } 3697 } catch (Throwable e) { 3698 throw new RuntimeException(e); 3699 } 3700 } 3701 } 3702 super.doClearSnapShot(); 3703 3704 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3705 int currentCount = clearSnapshotCounter.get(); 3706 if (currentCount == 1) { 3707 if (shouldWait) { 3708 try { 3709 /** 3710 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3711 * {@link DefaultMemStore#getScanners} could continue. 3712 */ 3713 postClearSnapShotCyclicBarrier.await(); 3714 } catch (Throwable e) { 3715 throw new RuntimeException(e); 3716 } 3717 } 3718 } 3719 } 3720 } 3721 } 3722}