001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ; 023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; 025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 027import static org.junit.Assert.assertArrayEquals; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertNull; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.spy; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.FileNotFoundException; 041import java.io.IOException; 042import java.lang.ref.SoftReference; 043import java.security.PrivilegedExceptionAction; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.Iterator; 049import java.util.List; 050import java.util.ListIterator; 051import java.util.NavigableSet; 052import java.util.Optional; 053import java.util.TreeSet; 054import java.util.concurrent.BrokenBarrierException; 055import java.util.concurrent.ConcurrentSkipListSet; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.CyclicBarrier; 058import java.util.concurrent.ExecutorService; 059import java.util.concurrent.Executors; 060import java.util.concurrent.Future; 061import java.util.concurrent.ThreadPoolExecutor; 062import java.util.concurrent.TimeUnit; 063import java.util.concurrent.atomic.AtomicBoolean; 064import java.util.concurrent.atomic.AtomicInteger; 065import java.util.concurrent.atomic.AtomicLong; 066import java.util.concurrent.atomic.AtomicReference; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.function.IntBinaryOperator; 070import org.apache.hadoop.conf.Configuration; 071import org.apache.hadoop.fs.FSDataOutputStream; 072import org.apache.hadoop.fs.FileStatus; 073import org.apache.hadoop.fs.FileSystem; 074import org.apache.hadoop.fs.FilterFileSystem; 075import org.apache.hadoop.fs.LocalFileSystem; 076import org.apache.hadoop.fs.Path; 077import org.apache.hadoop.fs.permission.FsPermission; 078import org.apache.hadoop.hbase.Cell; 079import org.apache.hadoop.hbase.CellBuilderType; 080import org.apache.hadoop.hbase.CellComparator; 081import org.apache.hadoop.hbase.CellComparatorImpl; 082import org.apache.hadoop.hbase.CellUtil; 083import org.apache.hadoop.hbase.ExtendedCell; 084import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 085import org.apache.hadoop.hbase.HBaseClassTestRule; 086import org.apache.hadoop.hbase.HBaseConfiguration; 087import org.apache.hadoop.hbase.HBaseTestingUtil; 088import org.apache.hadoop.hbase.HConstants; 089import org.apache.hadoop.hbase.KeyValue; 090import org.apache.hadoop.hbase.MemoryCompactionPolicy; 091import org.apache.hadoop.hbase.NamespaceDescriptor; 092import org.apache.hadoop.hbase.PrivateCellUtil; 093import org.apache.hadoop.hbase.TableName; 094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 096import org.apache.hadoop.hbase.client.Get; 097import org.apache.hadoop.hbase.client.RegionInfo; 098import org.apache.hadoop.hbase.client.RegionInfoBuilder; 099import org.apache.hadoop.hbase.client.Scan; 100import org.apache.hadoop.hbase.client.Scan.ReadType; 101import org.apache.hadoop.hbase.client.TableDescriptor; 102import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 104import org.apache.hadoop.hbase.filter.Filter; 105import org.apache.hadoop.hbase.filter.FilterBase; 106import org.apache.hadoop.hbase.io.compress.Compression; 107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 108import org.apache.hadoop.hbase.io.hfile.CacheConfig; 109import org.apache.hadoop.hbase.io.hfile.HFile; 110import org.apache.hadoop.hbase.io.hfile.HFileContext; 111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 112import org.apache.hadoop.hbase.monitoring.MonitoredTask; 113import org.apache.hadoop.hbase.nio.RefCnt; 114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 122import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 124import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 125import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 126import org.apache.hadoop.hbase.security.User; 127import org.apache.hadoop.hbase.testclassification.MediumTests; 128import org.apache.hadoop.hbase.testclassification.RegionServerTests; 129import org.apache.hadoop.hbase.util.BloomFilterUtil; 130import org.apache.hadoop.hbase.util.Bytes; 131import org.apache.hadoop.hbase.util.CommonFSUtils; 132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 134import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 135import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 136import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 137import org.apache.hadoop.hbase.wal.WALFactory; 138import org.apache.hadoop.util.Progressable; 139import org.junit.After; 140import org.junit.AfterClass; 141import org.junit.Before; 142import org.junit.ClassRule; 143import org.junit.Rule; 144import org.junit.Test; 145import org.junit.experimental.categories.Category; 146import org.junit.rules.TestName; 147import org.mockito.Mockito; 148import org.slf4j.Logger; 149import org.slf4j.LoggerFactory; 150 151import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 152 153/** 154 * Test class for the HStore 155 */ 156@Category({ RegionServerTests.class, MediumTests.class }) 157public class TestHStore { 158 159 @ClassRule 160 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 161 162 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 163 @Rule 164 public TestName name = new TestName(); 165 166 HRegion region; 167 HStore store; 168 byte[] table = Bytes.toBytes("table"); 169 byte[] family = Bytes.toBytes("family"); 170 171 byte[] row = Bytes.toBytes("row"); 172 byte[] row2 = Bytes.toBytes("row2"); 173 byte[] qf1 = Bytes.toBytes("qf1"); 174 byte[] qf2 = Bytes.toBytes("qf2"); 175 byte[] qf3 = Bytes.toBytes("qf3"); 176 byte[] qf4 = Bytes.toBytes("qf4"); 177 byte[] qf5 = Bytes.toBytes("qf5"); 178 byte[] qf6 = Bytes.toBytes("qf6"); 179 180 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 181 182 List<Cell> expected = new ArrayList<>(); 183 List<Cell> result = new ArrayList<>(); 184 185 long id = EnvironmentEdgeManager.currentTime(); 186 Get get = new Get(row); 187 188 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 189 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 190 191 @Before 192 public void setUp() throws IOException { 193 qualifiers.clear(); 194 qualifiers.add(qf1); 195 qualifiers.add(qf3); 196 qualifiers.add(qf5); 197 198 Iterator<byte[]> iter = qualifiers.iterator(); 199 while (iter.hasNext()) { 200 byte[] next = iter.next(); 201 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 202 get.addColumn(family, next); 203 } 204 } 205 206 private void init(String methodName) throws IOException { 207 init(methodName, TEST_UTIL.getConfiguration()); 208 } 209 210 private HStore init(String methodName, Configuration conf) throws IOException { 211 // some of the tests write 4 versions and then flush 212 // (with HBASE-4241, lower versions are collected on flush) 213 return init(methodName, conf, 214 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 215 } 216 217 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 218 throws IOException { 219 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 220 } 221 222 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 223 ColumnFamilyDescriptor hcd) throws IOException { 224 return init(methodName, conf, builder, hcd, null); 225 } 226 227 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 228 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 229 return init(methodName, conf, builder, hcd, hook, false); 230 } 231 232 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 233 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 234 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 235 Path basedir = new Path(DIR + methodName); 236 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 237 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 238 239 FileSystem fs = FileSystem.get(conf); 240 241 fs.delete(logdir, true); 242 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 243 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 244 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 245 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 246 Configuration walConf = new Configuration(conf); 247 CommonFSUtils.setRootDir(walConf, basedir); 248 WALFactory wals = new WALFactory(walConf, methodName); 249 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 250 htd, null); 251 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 252 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 253 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 254 } 255 256 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 257 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 258 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 259 if (hook == null) { 260 store = new HStore(region, hcd, conf, false); 261 } else { 262 store = new MyStore(region, hcd, conf, hook, switchToPread); 263 } 264 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 265 return store; 266 } 267 268 /** 269 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 270 */ 271 @Test 272 public void testFlushSizeSizing() throws Exception { 273 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 274 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 275 // Only retry once. 276 conf.setInt("hbase.hstore.flush.retries.number", 1); 277 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 278 // Inject our faulty LocalFileSystem 279 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 280 user.runAs(new PrivilegedExceptionAction<Object>() { 281 @Override 282 public Object run() throws Exception { 283 // Make sure it worked (above is sensitive to caching details in hadoop core) 284 FileSystem fs = FileSystem.get(conf); 285 assertEquals(FaultyFileSystem.class, fs.getClass()); 286 FaultyFileSystem ffs = (FaultyFileSystem) fs; 287 288 // Initialize region 289 init(name.getMethodName(), conf); 290 291 MemStoreSize mss = store.memstore.getFlushableSize(); 292 assertEquals(0, mss.getDataSize()); 293 LOG.info("Adding some data"); 294 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 295 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 296 // add the heap size of active (mutable) segment 297 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 298 mss = store.memstore.getFlushableSize(); 299 assertEquals(kvSize.getMemStoreSize(), mss); 300 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 301 try { 302 LOG.info("Flushing"); 303 flushStore(store, id++); 304 fail("Didn't bubble up IOE!"); 305 } catch (IOException ioe) { 306 assertTrue(ioe.getMessage().contains("Fault injected")); 307 } 308 // due to snapshot, change mutable to immutable segment 309 kvSize.incMemStoreSize(0, 310 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 311 mss = store.memstore.getFlushableSize(); 312 assertEquals(kvSize.getMemStoreSize(), mss); 313 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 314 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 315 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 316 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 317 // not yet cleared the snapshot -- the above flush failed. 318 assertEquals(kvSize.getMemStoreSize(), mss); 319 ffs.fault.set(false); 320 flushStore(store, id++); 321 mss = store.memstore.getFlushableSize(); 322 // Size should be the foreground kv size. 323 assertEquals(kvSize2.getMemStoreSize(), mss); 324 flushStore(store, id++); 325 mss = store.memstore.getFlushableSize(); 326 assertEquals(0, mss.getDataSize()); 327 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 328 return null; 329 } 330 }); 331 } 332 333 @Test 334 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 335 int numStoreFiles = 5; 336 writeAndRead(BloomType.ROWCOL, numStoreFiles); 337 338 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 339 // hard to know exactly the numbers here, we are just trying to 340 // prove that they are incrementing 341 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 342 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 343 } 344 345 @Test 346 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 347 int numStoreFiles = 5; 348 writeAndRead(BloomType.ROWCOL, numStoreFiles); 349 350 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 351 // hard to know exactly the numbers here, we are just trying to 352 // prove that they are incrementing 353 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 354 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 355 } 356 357 @Test 358 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 359 int numStoreFiles = 5; 360 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 361 362 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 363 // hard to know exactly the numbers here, we are just trying to 364 // prove that they are incrementing 365 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 366 } 367 368 @Test 369 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 370 int numStoreFiles = 5; 371 writeAndRead(BloomType.NONE, numStoreFiles); 372 373 assertEquals(0, store.getBloomFilterRequestsCount()); 374 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 375 376 // hard to know exactly the numbers here, we are just trying to 377 // prove that they are incrementing 378 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 379 } 380 381 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 382 Configuration conf = HBaseConfiguration.create(); 383 FileSystem fs = FileSystem.get(conf); 384 385 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 386 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 387 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 388 init(name.getMethodName(), conf, hcd); 389 390 for (int i = 1; i <= numStoreFiles; i++) { 391 byte[] row = Bytes.toBytes("row" + i); 392 LOG.info("Adding some data for the store file #" + i); 393 long timeStamp = EnvironmentEdgeManager.currentTime(); 394 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 395 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 396 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 397 flush(i); 398 } 399 400 // Verify the total number of store files 401 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 402 403 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 404 columns.add(qf1); 405 406 for (int i = 1; i <= numStoreFiles; i++) { 407 KeyValueScanner scanner = 408 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 409 scanner.peek(); 410 } 411 } 412 413 /** 414 * Verify that compression and data block encoding are respected by the createWriter method, used 415 * on store flush. 416 */ 417 @Test 418 public void testCreateWriter() throws Exception { 419 Configuration conf = HBaseConfiguration.create(); 420 FileSystem fs = FileSystem.get(conf); 421 422 ColumnFamilyDescriptor hcd = 423 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 424 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 425 init(name.getMethodName(), conf, hcd); 426 427 // Test createWriter 428 StoreFileWriter writer = store.getStoreEngine() 429 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 430 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 431 .includesTag(false).shouldDropBehind(false)); 432 Path path = writer.getPath(); 433 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 434 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 435 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 436 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 437 writer.close(); 438 439 // Verify that compression and encoding settings are respected 440 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 441 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 442 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 443 reader.close(); 444 } 445 446 @Test 447 public void testDeleteExpiredStoreFiles() throws Exception { 448 testDeleteExpiredStoreFiles(0); 449 testDeleteExpiredStoreFiles(1); 450 } 451 452 /** 453 * @param minVersions the MIN_VERSIONS for the column family 454 */ 455 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 456 int storeFileNum = 4; 457 int ttl = 4; 458 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 459 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 460 461 Configuration conf = HBaseConfiguration.create(); 462 // Enable the expired store file deletion 463 conf.setBoolean("hbase.store.delete.expired.storefile", true); 464 // Set the compaction threshold higher to avoid normal compactions. 465 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 466 467 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 468 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 469 470 long storeTtl = this.store.getScanInfo().getTtl(); 471 long sleepTime = storeTtl / storeFileNum; 472 long timeStamp; 473 // There are 4 store files and the max time stamp difference among these 474 // store files will be (this.store.ttl / storeFileNum) 475 for (int i = 1; i <= storeFileNum; i++) { 476 LOG.info("Adding some data for the store file #" + i); 477 timeStamp = EnvironmentEdgeManager.currentTime(); 478 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 479 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 480 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 481 flush(i); 482 edge.incrementTime(sleepTime); 483 } 484 485 // Verify the total number of store files 486 assertEquals(storeFileNum, this.store.getStorefiles().size()); 487 488 // Each call will find one expired store file and delete it before compaction happens. 489 // There will be no compaction due to threshold above. Last file will not be replaced. 490 for (int i = 1; i <= storeFileNum - 1; i++) { 491 // verify the expired store file. 492 assertFalse(this.store.requestCompaction().isPresent()); 493 Collection<HStoreFile> sfs = this.store.getStorefiles(); 494 // Ensure i files are gone. 495 if (minVersions == 0) { 496 assertEquals(storeFileNum - i, sfs.size()); 497 // Ensure only non-expired files remain. 498 for (HStoreFile sf : sfs) { 499 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 500 } 501 } else { 502 assertEquals(storeFileNum, sfs.size()); 503 } 504 // Let the next store file expired. 505 edge.incrementTime(sleepTime); 506 } 507 assertFalse(this.store.requestCompaction().isPresent()); 508 509 Collection<HStoreFile> sfs = this.store.getStorefiles(); 510 // Assert the last expired file is not removed. 511 if (minVersions == 0) { 512 assertEquals(1, sfs.size()); 513 } 514 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 515 assertTrue(ts < (edge.currentTime() - storeTtl)); 516 517 for (HStoreFile sf : sfs) { 518 sf.closeStoreFile(true); 519 } 520 } 521 522 @Test 523 public void testLowestModificationTime() throws Exception { 524 Configuration conf = HBaseConfiguration.create(); 525 FileSystem fs = FileSystem.get(conf); 526 // Initialize region 527 init(name.getMethodName(), conf); 528 529 int storeFileNum = 4; 530 for (int i = 1; i <= storeFileNum; i++) { 531 LOG.info("Adding some data for the store file #" + i); 532 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 533 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 534 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 535 flush(i); 536 } 537 // after flush; check the lowest time stamp 538 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 539 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 540 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 541 542 // after compact; check the lowest time stamp 543 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 544 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 545 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 546 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 547 } 548 549 private static long getLowestTimeStampFromFS(FileSystem fs, 550 final Collection<HStoreFile> candidates) throws IOException { 551 long minTs = Long.MAX_VALUE; 552 if (candidates.isEmpty()) { 553 return minTs; 554 } 555 Path[] p = new Path[candidates.size()]; 556 int i = 0; 557 for (HStoreFile sf : candidates) { 558 p[i] = sf.getPath(); 559 ++i; 560 } 561 562 FileStatus[] stats = fs.listStatus(p); 563 if (stats == null || stats.length == 0) { 564 return minTs; 565 } 566 for (FileStatus s : stats) { 567 minTs = Math.min(minTs, s.getModificationTime()); 568 } 569 return minTs; 570 } 571 572 ////////////////////////////////////////////////////////////////////////////// 573 // Get tests 574 ////////////////////////////////////////////////////////////////////////////// 575 576 private static final int BLOCKSIZE_SMALL = 8192; 577 578 /** 579 * Test for hbase-1686. 580 */ 581 @Test 582 public void testEmptyStoreFile() throws IOException { 583 init(this.name.getMethodName()); 584 // Write a store file. 585 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 586 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 587 flush(1); 588 // Now put in place an empty store file. Its a little tricky. Have to 589 // do manually with hacked in sequence id. 590 HStoreFile f = this.store.getStorefiles().iterator().next(); 591 Path storedir = f.getPath().getParent(); 592 long seqid = f.getMaxSequenceId(); 593 Configuration c = HBaseConfiguration.create(); 594 FileSystem fs = FileSystem.get(c); 595 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 596 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 597 .withOutputDir(storedir).withFileContext(meta).build(); 598 w.appendMetadata(seqid + 1, false); 599 w.close(); 600 this.store.close(); 601 // Reopen it... should pick up two files 602 this.store = 603 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 604 assertEquals(2, this.store.getStorefilesCount()); 605 606 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 607 assertEquals(1, result.size()); 608 } 609 610 /** 611 * Getting data from memstore only 612 */ 613 @Test 614 public void testGet_FromMemStoreOnly() throws IOException { 615 init(this.name.getMethodName()); 616 617 // Put data in memstore 618 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 619 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 620 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 621 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 622 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 623 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 624 625 // Get 626 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 627 628 // Compare 629 assertCheck(); 630 } 631 632 @Test 633 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 634 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 635 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 636 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 637 } 638 639 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 640 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 641 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 642 long currentTs = 100; 643 long minTs = currentTs; 644 // the extra cell won't be flushed to disk, 645 // so the min of timerange will be different between memStore and hfile. 646 for (int i = 0; i != (maxVersion + 1); ++i) { 647 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 648 if (i == 1) { 649 minTs = currentTs; 650 } 651 } 652 flushStore(store, id++); 653 654 Collection<HStoreFile> files = store.getStorefiles(); 655 assertEquals(1, files.size()); 656 HStoreFile f = files.iterator().next(); 657 f.initReader(); 658 StoreFileReader reader = f.getReader(); 659 assertEquals(minTs, reader.timeRange.getMin()); 660 assertEquals(currentTs, reader.timeRange.getMax()); 661 } 662 663 /** 664 * Getting data from files only 665 */ 666 @Test 667 public void testGet_FromFilesOnly() throws IOException { 668 init(this.name.getMethodName()); 669 670 // Put data in memstore 671 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 672 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 673 // flush 674 flush(1); 675 676 // Add more data 677 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 678 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 679 // flush 680 flush(2); 681 682 // Add more data 683 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 684 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 685 // flush 686 flush(3); 687 688 // Get 689 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 690 // this.store.get(get, qualifiers, result); 691 692 // Need to sort the result since multiple files 693 Collections.sort(result, CellComparatorImpl.COMPARATOR); 694 695 // Compare 696 assertCheck(); 697 } 698 699 /** 700 * Getting data from memstore and files 701 */ 702 @Test 703 public void testGet_FromMemStoreAndFiles() throws IOException { 704 init(this.name.getMethodName()); 705 706 // Put data in memstore 707 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 708 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 709 // flush 710 flush(1); 711 712 // Add more data 713 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 714 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 715 // flush 716 flush(2); 717 718 // Add more data 719 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 720 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 721 722 // Get 723 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 724 725 // Need to sort the result since multiple files 726 Collections.sort(result, CellComparatorImpl.COMPARATOR); 727 728 // Compare 729 assertCheck(); 730 } 731 732 private void flush(int storeFilessize) throws IOException { 733 flushStore(store, id++); 734 assertEquals(storeFilessize, this.store.getStorefiles().size()); 735 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 736 } 737 738 private void assertCheck() { 739 assertEquals(expected.size(), result.size()); 740 for (int i = 0; i < expected.size(); i++) { 741 assertEquals(expected.get(i), result.get(i)); 742 } 743 } 744 745 @After 746 public void tearDown() throws Exception { 747 EnvironmentEdgeManagerTestHelper.reset(); 748 if (store != null) { 749 try { 750 store.close(); 751 } catch (IOException e) { 752 } 753 store = null; 754 } 755 if (region != null) { 756 region.close(); 757 region = null; 758 } 759 } 760 761 @AfterClass 762 public static void tearDownAfterClass() throws IOException { 763 TEST_UTIL.cleanupTestDir(); 764 } 765 766 @Test 767 public void testHandleErrorsInFlush() throws Exception { 768 LOG.info("Setting up a faulty file system that cannot write"); 769 770 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 771 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 772 // Inject our faulty LocalFileSystem 773 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 774 user.runAs(new PrivilegedExceptionAction<Object>() { 775 @Override 776 public Object run() throws Exception { 777 // Make sure it worked (above is sensitive to caching details in hadoop core) 778 FileSystem fs = FileSystem.get(conf); 779 assertEquals(FaultyFileSystem.class, fs.getClass()); 780 781 // Initialize region 782 init(name.getMethodName(), conf); 783 784 LOG.info("Adding some data"); 785 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 786 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 787 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 788 789 LOG.info("Before flush, we should have no files"); 790 791 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext()); 792 Collection<StoreFileInfo> files = sft.load(); 793 assertEquals(0, files != null ? files.size() : 0); 794 795 // flush 796 try { 797 LOG.info("Flushing"); 798 flush(1); 799 fail("Didn't bubble up IOE!"); 800 } catch (IOException ioe) { 801 assertTrue(ioe.getMessage().contains("Fault injected")); 802 } 803 804 LOG.info("After failed flush, we should still have no files!"); 805 files = sft.load(); 806 assertEquals(0, files != null ? files.size() : 0); 807 store.getHRegion().getWAL().close(); 808 return null; 809 } 810 }); 811 FileSystem.closeAllForUGI(user.getUGI()); 812 } 813 814 /** 815 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 816 * thereafter it will succeed. Used by {@link TestHRegion} too. 817 */ 818 static class FaultyFileSystem extends FilterFileSystem { 819 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 820 private long faultPos = 200; 821 AtomicBoolean fault = new AtomicBoolean(true); 822 823 public FaultyFileSystem() { 824 super(new LocalFileSystem()); 825 LOG.info("Creating faulty!"); 826 } 827 828 @Override 829 public FSDataOutputStream create(Path p) throws IOException { 830 return new FaultyOutputStream(super.create(p), faultPos, fault); 831 } 832 833 @Override 834 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 835 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 836 return new FaultyOutputStream( 837 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 838 faultPos, fault); 839 } 840 841 @Override 842 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 843 short replication, long blockSize, Progressable progress) throws IOException { 844 // Fake it. Call create instead. The default implementation throws an IOE 845 // that this is not supported. 846 return create(f, overwrite, bufferSize, replication, blockSize, progress); 847 } 848 } 849 850 static class FaultyOutputStream extends FSDataOutputStream { 851 volatile long faultPos = Long.MAX_VALUE; 852 private final AtomicBoolean fault; 853 854 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 855 throws IOException { 856 super(out, null); 857 this.faultPos = faultPos; 858 this.fault = fault; 859 } 860 861 @Override 862 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 863 LOG.info("faulty stream write at pos " + getPos()); 864 injectFault(); 865 super.write(buf, offset, length); 866 } 867 868 private void injectFault() throws IOException { 869 if (this.fault.get() && getPos() >= faultPos) { 870 throw new IOException("Fault injected"); 871 } 872 } 873 } 874 875 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 876 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 877 storeFlushCtx.prepare(); 878 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 879 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 880 return storeFlushCtx; 881 } 882 883 /** 884 * Generate a list of KeyValues for testing based on given parameters 885 * @return the rows key-value list 886 */ 887 private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 888 byte[] family) { 889 List<ExtendedCell> kvList = new ArrayList<>(); 890 for (int i = 1; i <= numRows; i++) { 891 byte[] b = Bytes.toBytes(i); 892 for (long timestamp : timestamps) { 893 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 894 } 895 } 896 return kvList; 897 } 898 899 /** 900 * Test to ensure correctness when using Stores with multiple timestamps 901 */ 902 @Test 903 public void testMultipleTimestamps() throws IOException { 904 int numRows = 1; 905 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 906 long[] timestamps2 = new long[] { 30, 80 }; 907 908 init(this.name.getMethodName()); 909 910 List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 911 for (ExtendedCell kv : kvList1) { 912 this.store.add(kv, null); 913 } 914 915 flushStore(store, id++); 916 917 List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 918 for (ExtendedCell kv : kvList2) { 919 this.store.add(kv, null); 920 } 921 922 List<Cell> result; 923 Get get = new Get(Bytes.toBytes(1)); 924 get.addColumn(family, qf1); 925 926 get.setTimeRange(0, 15); 927 result = HBaseTestingUtil.getFromStoreFile(store, get); 928 assertTrue(result.size() > 0); 929 930 get.setTimeRange(40, 90); 931 result = HBaseTestingUtil.getFromStoreFile(store, get); 932 assertTrue(result.size() > 0); 933 934 get.setTimeRange(10, 45); 935 result = HBaseTestingUtil.getFromStoreFile(store, get); 936 assertTrue(result.size() > 0); 937 938 get.setTimeRange(80, 145); 939 result = HBaseTestingUtil.getFromStoreFile(store, get); 940 assertTrue(result.size() > 0); 941 942 get.setTimeRange(1, 2); 943 result = HBaseTestingUtil.getFromStoreFile(store, get); 944 assertTrue(result.size() > 0); 945 946 get.setTimeRange(90, 200); 947 result = HBaseTestingUtil.getFromStoreFile(store, get); 948 assertTrue(result.size() == 0); 949 } 950 951 /** 952 * Test for HBASE-3492 - Test split on empty colfam (no store files). 953 * @throws IOException When the IO operations fail. 954 */ 955 @Test 956 public void testSplitWithEmptyColFam() throws IOException { 957 init(this.name.getMethodName()); 958 assertFalse(store.getSplitPoint().isPresent()); 959 } 960 961 @Test 962 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 963 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 964 long anyValue = 10; 965 966 // We'll check that it uses correct config and propagates it appropriately by going thru 967 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 968 // a number we pass in is higher than some config value, inside compactionPolicy. 969 Configuration conf = HBaseConfiguration.create(); 970 conf.setLong(CONFIG_KEY, anyValue); 971 init(name.getMethodName() + "-xml", conf); 972 assertTrue(store.throttleCompaction(anyValue + 1)); 973 assertFalse(store.throttleCompaction(anyValue)); 974 975 // HTD overrides XML. 976 --anyValue; 977 init( 978 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 979 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 980 ColumnFamilyDescriptorBuilder.of(family)); 981 assertTrue(store.throttleCompaction(anyValue + 1)); 982 assertFalse(store.throttleCompaction(anyValue)); 983 984 // HCD overrides them both. 985 --anyValue; 986 init(name.getMethodName() + "-hcd", conf, 987 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 988 Long.toString(anyValue)), 989 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 990 .build()); 991 assertTrue(store.throttleCompaction(anyValue + 1)); 992 assertFalse(store.throttleCompaction(anyValue)); 993 } 994 995 public static class DummyStoreEngine extends DefaultStoreEngine { 996 public static DefaultCompactor lastCreatedCompactor = null; 997 998 @Override 999 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 1000 throws IOException { 1001 super.createComponents(conf, store, comparator); 1002 lastCreatedCompactor = this.compactor; 1003 } 1004 } 1005 1006 @Test 1007 public void testStoreUsesSearchEngineOverride() throws Exception { 1008 Configuration conf = HBaseConfiguration.create(); 1009 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1010 init(this.name.getMethodName(), conf); 1011 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1012 } 1013 1014 private void addStoreFile() throws IOException { 1015 HStoreFile f = this.store.getStorefiles().iterator().next(); 1016 Path storedir = f.getPath().getParent(); 1017 long seqid = this.store.getMaxSequenceId().orElse(0L); 1018 Configuration c = TEST_UTIL.getConfiguration(); 1019 FileSystem fs = FileSystem.get(c); 1020 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1021 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1022 .withOutputDir(storedir).withFileContext(fileContext).build(); 1023 w.appendMetadata(seqid + 1, false); 1024 w.close(); 1025 LOG.info("Added store file:" + w.getPath()); 1026 } 1027 1028 private void archiveStoreFile(int index) throws IOException { 1029 Collection<HStoreFile> files = this.store.getStorefiles(); 1030 HStoreFile sf = null; 1031 Iterator<HStoreFile> it = files.iterator(); 1032 for (int i = 0; i <= index; i++) { 1033 sf = it.next(); 1034 } 1035 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 1036 Lists.newArrayList(sf)); 1037 } 1038 1039 private void closeCompactedFile(int index) throws IOException { 1040 Collection<HStoreFile> files = 1041 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1042 if (files.size() > 0) { 1043 HStoreFile sf = null; 1044 Iterator<HStoreFile> it = files.iterator(); 1045 for (int i = 0; i <= index; i++) { 1046 sf = it.next(); 1047 } 1048 sf.closeStoreFile(true); 1049 store.getStoreEngine().getStoreFileManager() 1050 .removeCompactedFiles(Collections.singletonList(sf)); 1051 } 1052 } 1053 1054 @Test 1055 public void testRefreshStoreFiles() throws Exception { 1056 init(name.getMethodName()); 1057 1058 assertEquals(0, this.store.getStorefilesCount()); 1059 1060 // Test refreshing store files when no store files are there 1061 store.refreshStoreFiles(); 1062 assertEquals(0, this.store.getStorefilesCount()); 1063 1064 // add some data, flush 1065 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1066 flush(1); 1067 assertEquals(1, this.store.getStorefilesCount()); 1068 1069 // add one more file 1070 addStoreFile(); 1071 1072 assertEquals(1, this.store.getStorefilesCount()); 1073 store.refreshStoreFiles(); 1074 assertEquals(2, this.store.getStorefilesCount()); 1075 1076 // add three more files 1077 addStoreFile(); 1078 addStoreFile(); 1079 addStoreFile(); 1080 1081 assertEquals(2, this.store.getStorefilesCount()); 1082 store.refreshStoreFiles(); 1083 assertEquals(5, this.store.getStorefilesCount()); 1084 1085 closeCompactedFile(0); 1086 archiveStoreFile(0); 1087 1088 assertEquals(5, this.store.getStorefilesCount()); 1089 store.refreshStoreFiles(); 1090 assertEquals(4, this.store.getStorefilesCount()); 1091 1092 archiveStoreFile(0); 1093 archiveStoreFile(1); 1094 archiveStoreFile(2); 1095 1096 assertEquals(4, this.store.getStorefilesCount()); 1097 store.refreshStoreFiles(); 1098 assertEquals(1, this.store.getStorefilesCount()); 1099 1100 archiveStoreFile(0); 1101 store.refreshStoreFiles(); 1102 assertEquals(0, this.store.getStorefilesCount()); 1103 } 1104 1105 @Test 1106 public void testRefreshStoreFilesNotChanged() throws IOException { 1107 init(name.getMethodName()); 1108 1109 assertEquals(0, this.store.getStorefilesCount()); 1110 1111 // add some data, flush 1112 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1113 flush(1); 1114 // add one more file 1115 addStoreFile(); 1116 1117 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1118 1119 // call first time after files changed 1120 spiedStoreEngine.refreshStoreFiles(); 1121 assertEquals(2, this.store.getStorefilesCount()); 1122 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1123 1124 // call second time 1125 spiedStoreEngine.refreshStoreFiles(); 1126 1127 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1128 // refreshed, 1129 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1130 } 1131 1132 @Test 1133 public void testScanWithCompactionAfterFlush() throws Exception { 1134 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1135 EverythingPolicy.class.getName()); 1136 init(name.getMethodName()); 1137 1138 assertEquals(0, this.store.getStorefilesCount()); 1139 1140 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1141 // add some data, flush 1142 this.store.add(kv, null); 1143 flush(1); 1144 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1145 // add some data, flush 1146 this.store.add(kv, null); 1147 flush(2); 1148 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1149 // add some data, flush 1150 this.store.add(kv, null); 1151 flush(3); 1152 1153 ExecutorService service = Executors.newFixedThreadPool(2); 1154 1155 Scan scan = new Scan(new Get(row)); 1156 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1157 try { 1158 LOG.info(">>>> creating scanner"); 1159 return this.store.createScanner(scan, 1160 new ScanInfo(HBaseConfiguration.create(), 1161 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1162 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1163 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1164 } catch (IOException e) { 1165 e.printStackTrace(); 1166 return null; 1167 } 1168 }); 1169 Future compactFuture = service.submit(() -> { 1170 try { 1171 LOG.info(">>>>>> starting compaction"); 1172 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1173 assertTrue(opCompaction.isPresent()); 1174 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1175 LOG.info(">>>>>> Compaction is finished"); 1176 this.store.closeAndArchiveCompactedFiles(); 1177 LOG.info(">>>>>> Compacted files deleted"); 1178 } catch (IOException e) { 1179 e.printStackTrace(); 1180 } 1181 }); 1182 1183 KeyValueScanner kvs = scanFuture.get(); 1184 compactFuture.get(); 1185 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1186 if (s instanceof StoreFileScanner) { 1187 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1188 } 1189 }); 1190 kvs.seek(kv); 1191 service.shutdownNow(); 1192 } 1193 1194 private long countMemStoreScanner(StoreScanner scanner) { 1195 if (scanner.currentScanners == null) { 1196 return 0; 1197 } 1198 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1199 } 1200 1201 @Test 1202 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1203 long seqId = 100; 1204 long timestamp = EnvironmentEdgeManager.currentTime(); 1205 ExtendedCell cell0 = 1206 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1207 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1208 PrivateCellUtil.setSequenceId(cell0, seqId); 1209 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1210 1211 ExtendedCell cell1 = 1212 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1213 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1214 PrivateCellUtil.setSequenceId(cell1, seqId); 1215 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1216 1217 seqId = 101; 1218 timestamp = EnvironmentEdgeManager.currentTime(); 1219 ExtendedCell cell2 = 1220 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1221 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1222 PrivateCellUtil.setSequenceId(cell2, seqId); 1223 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1224 } 1225 1226 private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot, 1227 List<ExtendedCell> inputCellsAfterSnapshot) throws IOException { 1228 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1229 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1230 long seqId = Long.MIN_VALUE; 1231 for (ExtendedCell c : inputCellsBeforeSnapshot) { 1232 quals.add(CellUtil.cloneQualifier(c)); 1233 seqId = Math.max(seqId, c.getSequenceId()); 1234 } 1235 for (ExtendedCell c : inputCellsAfterSnapshot) { 1236 quals.add(CellUtil.cloneQualifier(c)); 1237 seqId = Math.max(seqId, c.getSequenceId()); 1238 } 1239 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1240 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1241 storeFlushCtx.prepare(); 1242 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1243 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1244 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1245 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1246 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1247 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1248 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1249 // snapshot has no data after flush 1250 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1251 boolean more; 1252 int cellCount = 0; 1253 do { 1254 List<Cell> cells = new ArrayList<>(); 1255 more = s.next(cells); 1256 cellCount += cells.size(); 1257 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1258 } while (more); 1259 assertEquals( 1260 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1261 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1262 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1263 // the current scanners is cleared 1264 assertEquals(0, countMemStoreScanner(s)); 1265 } 1266 } 1267 1268 private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1269 throws IOException { 1270 return createCell(row, qualifier, ts, sequenceId, value); 1271 } 1272 1273 private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, 1274 byte[] value) throws IOException { 1275 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1276 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1277 .setValue(value).setSequenceId(sequenceId).build(); 1278 } 1279 1280 @Test 1281 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1282 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1283 final int expectedSize = 3; 1284 testFlushBeforeCompletingScan(new MyListHook() { 1285 @Override 1286 public void hook(int currentSize) { 1287 if (currentSize == expectedSize - 1) { 1288 try { 1289 flushStore(store, id++); 1290 timeToGoNextRow.set(true); 1291 } catch (IOException e) { 1292 throw new RuntimeException(e); 1293 } 1294 } 1295 } 1296 }, new FilterBase() { 1297 @Override 1298 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1299 return ReturnCode.INCLUDE; 1300 } 1301 }, expectedSize); 1302 } 1303 1304 @Test 1305 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1306 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1307 final int expectedSize = 2; 1308 testFlushBeforeCompletingScan(new MyListHook() { 1309 @Override 1310 public void hook(int currentSize) { 1311 if (currentSize == expectedSize - 1) { 1312 try { 1313 flushStore(store, id++); 1314 timeToGoNextRow.set(true); 1315 } catch (IOException e) { 1316 throw new RuntimeException(e); 1317 } 1318 } 1319 } 1320 }, new FilterBase() { 1321 @Override 1322 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1323 if (timeToGoNextRow.get()) { 1324 timeToGoNextRow.set(false); 1325 return ReturnCode.NEXT_ROW; 1326 } else { 1327 return ReturnCode.INCLUDE; 1328 } 1329 } 1330 }, expectedSize); 1331 } 1332 1333 @Test 1334 public void testFlushBeforeCompletingScanWithFilterHint() 1335 throws IOException, InterruptedException { 1336 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1337 final int expectedSize = 2; 1338 testFlushBeforeCompletingScan(new MyListHook() { 1339 @Override 1340 public void hook(int currentSize) { 1341 if (currentSize == expectedSize - 1) { 1342 try { 1343 flushStore(store, id++); 1344 timeToGetHint.set(true); 1345 } catch (IOException e) { 1346 throw new RuntimeException(e); 1347 } 1348 } 1349 } 1350 }, new FilterBase() { 1351 @Override 1352 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1353 if (timeToGetHint.get()) { 1354 timeToGetHint.set(false); 1355 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1356 } else { 1357 return Filter.ReturnCode.INCLUDE; 1358 } 1359 } 1360 1361 @Override 1362 public Cell getNextCellHint(Cell currentCell) throws IOException { 1363 return currentCell; 1364 } 1365 }, expectedSize); 1366 } 1367 1368 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1369 throws IOException, InterruptedException { 1370 Configuration conf = HBaseConfiguration.create(); 1371 byte[] r0 = Bytes.toBytes("row0"); 1372 byte[] r1 = Bytes.toBytes("row1"); 1373 byte[] r2 = Bytes.toBytes("row2"); 1374 byte[] value0 = Bytes.toBytes("value0"); 1375 byte[] value1 = Bytes.toBytes("value1"); 1376 byte[] value2 = Bytes.toBytes("value2"); 1377 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1378 long ts = EnvironmentEdgeManager.currentTime(); 1379 long seqId = 100; 1380 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1381 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1382 new MyStoreHook() { 1383 @Override 1384 public long getSmallestReadPoint(HStore store) { 1385 return seqId + 3; 1386 } 1387 }); 1388 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1389 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1390 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1391 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1392 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1393 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1394 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1395 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1396 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1397 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1398 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1399 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1400 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1401 List<Cell> myList = new MyList<>(hook); 1402 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1403 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1404 // r1 1405 scanner.next(myList); 1406 assertEquals(expectedSize, myList.size()); 1407 for (Cell c : myList) { 1408 byte[] actualValue = CellUtil.cloneValue(c); 1409 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1410 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1411 } 1412 List<Cell> normalList = new ArrayList<>(3); 1413 // r2 1414 scanner.next(normalList); 1415 assertEquals(3, normalList.size()); 1416 for (Cell c : normalList) { 1417 byte[] actualValue = CellUtil.cloneValue(c); 1418 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1419 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1420 } 1421 } 1422 } 1423 1424 @Test 1425 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1426 Configuration conf = HBaseConfiguration.create(); 1427 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1428 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1429 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1430 byte[] value = Bytes.toBytes("value"); 1431 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1432 long ts = EnvironmentEdgeManager.currentTime(); 1433 long seqId = 100; 1434 // older data whihc shouldn't be "seen" by client 1435 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1436 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1437 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1438 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1439 quals.add(qf1); 1440 quals.add(qf2); 1441 quals.add(qf3); 1442 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1443 MyCompactingMemStore.START_TEST.set(true); 1444 Runnable flush = () -> { 1445 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1446 // recreate the active memstore -- phase (4/5) 1447 storeFlushCtx.prepare(); 1448 }; 1449 ExecutorService service = Executors.newSingleThreadExecutor(); 1450 service.execute(flush); 1451 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1452 // this is blocked until we recreate the active memstore -- phase (3/5) 1453 // we get scanner from active memstore but it is empty -- phase (5/5) 1454 InternalScanner scanner = 1455 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1456 service.shutdown(); 1457 service.awaitTermination(20, TimeUnit.SECONDS); 1458 try { 1459 try { 1460 List<Cell> results = new ArrayList<>(); 1461 scanner.next(results); 1462 assertEquals(3, results.size()); 1463 for (Cell c : results) { 1464 byte[] actualValue = CellUtil.cloneValue(c); 1465 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1466 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1467 } 1468 } finally { 1469 scanner.close(); 1470 } 1471 } finally { 1472 MyCompactingMemStore.START_TEST.set(false); 1473 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1474 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1475 } 1476 } 1477 1478 @Test 1479 public void testScanWithDoubleFlush() throws IOException { 1480 Configuration conf = HBaseConfiguration.create(); 1481 // Initialize region 1482 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1483 @Override 1484 public void getScanners(MyStore store) throws IOException { 1485 final long tmpId = id++; 1486 ExecutorService s = Executors.newSingleThreadExecutor(); 1487 s.execute(() -> { 1488 try { 1489 // flush the store before storescanner updates the scanners from store. 1490 // The current data will be flushed into files, and the memstore will 1491 // be clear. 1492 // -- phase (4/4) 1493 flushStore(store, tmpId); 1494 } catch (IOException ex) { 1495 throw new RuntimeException(ex); 1496 } 1497 }); 1498 s.shutdown(); 1499 try { 1500 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1501 s.awaitTermination(3, TimeUnit.SECONDS); 1502 } catch (InterruptedException ex) { 1503 } 1504 } 1505 }); 1506 byte[] oldValue = Bytes.toBytes("oldValue"); 1507 byte[] currentValue = Bytes.toBytes("currentValue"); 1508 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1509 long ts = EnvironmentEdgeManager.currentTime(); 1510 long seqId = 100; 1511 // older data whihc shouldn't be "seen" by client 1512 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1513 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1514 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1515 long snapshotId = id++; 1516 // push older data into snapshot -- phase (1/4) 1517 StoreFlushContext storeFlushCtx = 1518 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1519 storeFlushCtx.prepare(); 1520 1521 // insert current data into active -- phase (2/4) 1522 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1523 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1524 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1525 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1526 quals.add(qf1); 1527 quals.add(qf2); 1528 quals.add(qf3); 1529 try (InternalScanner scanner = 1530 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1531 // complete the flush -- phase (3/4) 1532 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1533 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1534 1535 List<Cell> results = new ArrayList<>(); 1536 scanner.next(results); 1537 assertEquals(3, results.size()); 1538 for (Cell c : results) { 1539 byte[] actualValue = CellUtil.cloneValue(c); 1540 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1541 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1542 } 1543 } 1544 } 1545 1546 /** 1547 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1548 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1549 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1550 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1551 */ 1552 @Test 1553 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1554 Configuration conf = HBaseConfiguration.create(); 1555 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1556 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1557 byte[] r0 = Bytes.toBytes("row0"); 1558 byte[] r1 = Bytes.toBytes("row1"); 1559 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1560 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1561 // Initialize region 1562 final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1563 @Override 1564 public void getScanners(MyStore store) throws IOException { 1565 try { 1566 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1567 // following TestHStore.flushStore 1568 if (shouldWaitRef.get()) { 1569 // wait the following compaction Task start 1570 cyclicBarrier.await(); 1571 // wait the following HStore.closeAndArchiveCompactedFiles end. 1572 cyclicBarrier.await(); 1573 } 1574 } catch (BrokenBarrierException | InterruptedException e) { 1575 throw new RuntimeException(e); 1576 } 1577 } 1578 }); 1579 1580 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1581 Runnable compactionTask = () -> { 1582 try { 1583 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1584 // entering the MyStore.getScanners, compactionTask could start. 1585 cyclicBarrier.await(); 1586 region.compactStore(family, new NoLimitThroughputController()); 1587 myStore.closeAndArchiveCompactedFiles(); 1588 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1589 cyclicBarrier.await(); 1590 } catch (Throwable e) { 1591 compactionExceptionRef.set(e); 1592 } 1593 }; 1594 1595 long ts = EnvironmentEdgeManager.currentTime(); 1596 long seqId = 100; 1597 byte[] value = Bytes.toBytes("value"); 1598 // older data whihc shouldn't be "seen" by client 1599 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1600 flushStore(myStore, id++); 1601 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1602 flushStore(myStore, id++); 1603 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1604 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1605 quals.add(qf1); 1606 quals.add(qf2); 1607 quals.add(qf3); 1608 1609 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1610 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1611 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1612 1613 Thread.currentThread() 1614 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1615 Scan scan = new Scan(); 1616 scan.withStartRow(r0, true); 1617 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1618 List<Cell> results = new MyList<>(size -> { 1619 switch (size) { 1620 case 1: 1621 shouldWaitRef.set(true); 1622 Thread thread = new Thread(compactionTask); 1623 thread.setName("MyCompacting Thread."); 1624 thread.start(); 1625 try { 1626 flushStore(myStore, id++); 1627 thread.join(); 1628 } catch (IOException | InterruptedException e) { 1629 throw new RuntimeException(e); 1630 } 1631 shouldWaitRef.set(false); 1632 break; 1633 default: 1634 break; 1635 } 1636 }); 1637 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1638 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1639 scanner.next(results); 1640 // The results is r0 row cells. 1641 assertEquals(3, results.size()); 1642 assertTrue(compactionExceptionRef.get() == null); 1643 } 1644 } 1645 1646 @Test 1647 public void testReclaimChunkWhenScaning() throws IOException { 1648 init("testReclaimChunkWhenScaning"); 1649 long ts = EnvironmentEdgeManager.currentTime(); 1650 long seqId = 100; 1651 byte[] value = Bytes.toBytes("value"); 1652 // older data whihc shouldn't be "seen" by client 1653 store.add(createCell(qf1, ts, seqId, value), null); 1654 store.add(createCell(qf2, ts, seqId, value), null); 1655 store.add(createCell(qf3, ts, seqId, value), null); 1656 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1657 quals.add(qf1); 1658 quals.add(qf2); 1659 quals.add(qf3); 1660 try (InternalScanner scanner = 1661 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1662 List<Cell> results = new MyList<>(size -> { 1663 switch (size) { 1664 // 1) we get the first cell (qf1) 1665 // 2) flush the data to have StoreScanner update inner scanners 1666 // 3) the chunk will be reclaimed after updaing 1667 case 1: 1668 try { 1669 flushStore(store, id++); 1670 } catch (IOException e) { 1671 throw new RuntimeException(e); 1672 } 1673 break; 1674 // 1) we get the second cell (qf2) 1675 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1676 case 2: 1677 try { 1678 byte[] newValue = Bytes.toBytes("newValue"); 1679 // older data whihc shouldn't be "seen" by client 1680 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1681 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1682 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1683 } catch (IOException e) { 1684 throw new RuntimeException(e); 1685 } 1686 break; 1687 default: 1688 break; 1689 } 1690 }); 1691 scanner.next(results); 1692 assertEquals(3, results.size()); 1693 for (Cell c : results) { 1694 byte[] actualValue = CellUtil.cloneValue(c); 1695 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1696 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1697 } 1698 } 1699 } 1700 1701 /** 1702 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1703 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1704 * the corresponding segments. In short, there will be some segements which isn't in merge are 1705 * removed. 1706 */ 1707 @Test 1708 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1709 int flushSize = 500; 1710 Configuration conf = HBaseConfiguration.create(); 1711 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1712 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1713 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1714 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1715 // Set the lower threshold to invoke the "MERGE" policy 1716 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1717 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1718 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1719 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1720 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1721 long ts = EnvironmentEdgeManager.currentTime(); 1722 long seqId = 100; 1723 // older data whihc shouldn't be "seen" by client 1724 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1725 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1726 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1727 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1728 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1729 storeFlushCtx.prepare(); 1730 // This shouldn't invoke another in-memory flush because the first compactor thread 1731 // hasn't accomplished the in-memory compaction. 1732 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1733 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1734 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1735 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1736 // okay. Let the compaction be completed 1737 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1738 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1739 while (mem.isMemStoreFlushingInMemory()) { 1740 TimeUnit.SECONDS.sleep(1); 1741 } 1742 // This should invoke another in-memory flush. 1743 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1744 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1745 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1746 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1747 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1748 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1749 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1750 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1751 } 1752 1753 @Test 1754 public void testAge() throws IOException { 1755 long currentTime = EnvironmentEdgeManager.currentTime(); 1756 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1757 edge.setValue(currentTime); 1758 EnvironmentEdgeManager.injectEdge(edge); 1759 Configuration conf = TEST_UTIL.getConfiguration(); 1760 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1761 initHRegion(name.getMethodName(), conf, 1762 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1763 HStore store = new HStore(region, hcd, conf, false) { 1764 1765 @Override 1766 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1767 CellComparator kvComparator) throws IOException { 1768 List<HStoreFile> storefiles = 1769 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1770 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1771 StoreFileManager sfm = mock(StoreFileManager.class); 1772 when(sfm.getStoreFiles()).thenReturn(storefiles); 1773 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1774 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1775 return storeEngine; 1776 } 1777 }; 1778 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1779 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1780 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1781 } 1782 1783 private HStoreFile mockStoreFile(long createdTime) { 1784 StoreFileInfo info = mock(StoreFileInfo.class); 1785 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1786 HStoreFile sf = mock(HStoreFile.class); 1787 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1788 when(sf.isHFile()).thenReturn(true); 1789 when(sf.getFileInfo()).thenReturn(info); 1790 return sf; 1791 } 1792 1793 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1794 throws IOException { 1795 return (MyStore) init(methodName, conf, 1796 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1797 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1798 } 1799 1800 private static class MyStore extends HStore { 1801 private final MyStoreHook hook; 1802 1803 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1804 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1805 super(region, family, confParam, false); 1806 this.hook = hook; 1807 } 1808 1809 @Override 1810 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1811 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1812 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1813 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1814 hook.getScanners(this); 1815 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1816 stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion); 1817 } 1818 1819 @Override 1820 public long getSmallestReadPoint() { 1821 return hook.getSmallestReadPoint(this); 1822 } 1823 } 1824 1825 private abstract static class MyStoreHook { 1826 1827 void getScanners(MyStore store) throws IOException { 1828 } 1829 1830 long getSmallestReadPoint(HStore store) { 1831 return store.getHRegion().getSmallestReadPoint(); 1832 } 1833 } 1834 1835 @Test 1836 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1837 Configuration conf = HBaseConfiguration.create(); 1838 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1839 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1840 // Set the lower threshold to invoke the "MERGE" policy 1841 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1842 }); 1843 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1844 long ts = EnvironmentEdgeManager.currentTime(); 1845 long seqID = 1L; 1846 // Add some data to the region and do some flushes 1847 for (int i = 1; i < 10; i++) { 1848 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1849 memStoreSizing); 1850 } 1851 // flush them 1852 flushStore(store, seqID); 1853 for (int i = 11; i < 20; i++) { 1854 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1855 memStoreSizing); 1856 } 1857 // flush them 1858 flushStore(store, seqID); 1859 for (int i = 21; i < 30; i++) { 1860 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1861 memStoreSizing); 1862 } 1863 // flush them 1864 flushStore(store, seqID); 1865 1866 assertEquals(3, store.getStorefilesCount()); 1867 Scan scan = new Scan(); 1868 scan.addFamily(family); 1869 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1870 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1871 StoreScanner storeScanner = 1872 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1873 // get the current heap 1874 KeyValueHeap heap = storeScanner.heap; 1875 // create more store files 1876 for (int i = 31; i < 40; i++) { 1877 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1878 memStoreSizing); 1879 } 1880 // flush them 1881 flushStore(store, seqID); 1882 1883 for (int i = 41; i < 50; i++) { 1884 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1885 memStoreSizing); 1886 } 1887 // flush them 1888 flushStore(store, seqID); 1889 storefiles2 = store.getStorefiles(); 1890 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1891 actualStorefiles1.removeAll(actualStorefiles); 1892 // Do compaction 1893 MyThread thread = new MyThread(storeScanner); 1894 thread.start(); 1895 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1896 thread.join(); 1897 KeyValueHeap heap2 = thread.getHeap(); 1898 assertFalse(heap.equals(heap2)); 1899 } 1900 1901 @Test 1902 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1903 Configuration conf = HBaseConfiguration.create(); 1904 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1905 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1906 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1907 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1908 }); 1909 Scan scan = new Scan(); 1910 scan.addFamily(family); 1911 // ReadType on Scan is still DEFAULT only. 1912 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1913 StoreScanner storeScanner = 1914 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1915 assertFalse(storeScanner.isScanUsePread()); 1916 } 1917 1918 @Test 1919 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1920 Configuration conf = HBaseConfiguration.create(); 1921 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1922 init(name.getMethodName(), conf, 1923 TableDescriptorBuilder.newBuilder( 1924 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1925 ColumnFamilyDescriptorBuilder.newBuilder(family) 1926 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1927 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1928 .startsWith("eager".toUpperCase())); 1929 } 1930 1931 @Test 1932 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1933 final TableName tn = TableName.valueOf(name.getMethodName()); 1934 init(name.getMethodName()); 1935 1936 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1937 1938 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1939 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1940 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1941 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1942 1943 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1944 .setEndKey(Bytes.toBytes("b")).build(); 1945 1946 // Compacting two files down to one, reducing size 1947 sizeStore.put(regionInfo, 1024L + 4096L); 1948 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1949 Arrays.asList(sf2)); 1950 1951 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1952 1953 // The same file length in and out should have no change 1954 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1955 Arrays.asList(sf2)); 1956 1957 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1958 1959 // Increase the total size used 1960 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1961 Arrays.asList(sf3)); 1962 1963 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1964 1965 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1966 .setEndKey(Bytes.toBytes("c")).build(); 1967 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1968 1969 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1970 } 1971 1972 @Test 1973 public void testHFileContextSetWithCFAndTable() throws Exception { 1974 init(this.name.getMethodName()); 1975 StoreFileWriter writer = store.getStoreEngine() 1976 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1977 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1978 .includesTag(false).shouldDropBehind(true)); 1979 HFileContext hFileContext = writer.getLiveFileWriter().getFileContext(); 1980 assertArrayEquals(family, hFileContext.getColumnFamily()); 1981 assertArrayEquals(table, hFileContext.getTableName()); 1982 } 1983 1984 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1985 // but its dataSize exceeds inmemoryFlushSize 1986 @Test 1987 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1988 throws IOException, InterruptedException { 1989 Configuration conf = HBaseConfiguration.create(); 1990 1991 byte[] smallValue = new byte[3]; 1992 byte[] largeValue = new byte[9]; 1993 final long timestamp = EnvironmentEdgeManager.currentTime(); 1994 final long seqId = 100; 1995 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1996 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1997 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1998 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1999 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 2000 2001 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2002 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 2003 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2004 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2005 2006 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2007 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2008 2009 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 2010 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2011 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 2012 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 2013 2014 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2015 Thread smallCellThread = new Thread(() -> { 2016 try { 2017 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2018 } catch (Throwable exception) { 2019 exceptionRef.set(exception); 2020 } 2021 }); 2022 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 2023 smallCellThread.start(); 2024 2025 String oldThreadName = Thread.currentThread().getName(); 2026 try { 2027 /** 2028 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2029 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2030 * invokes flushInMemory. 2031 * <p/> 2032 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2033 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2034 * method, CompactingMemStore.active has no cell. 2035 */ 2036 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2037 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2038 smallCellThread.join(); 2039 2040 for (int i = 0; i < 100; i++) { 2041 long currentTimestamp = timestamp + 100 + i; 2042 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2043 store.add(cell, new NonThreadSafeMemStoreSizing()); 2044 } 2045 } finally { 2046 Thread.currentThread().setName(oldThreadName); 2047 } 2048 2049 assertTrue(exceptionRef.get() == null); 2050 2051 } 2052 2053 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2054 // InmemoryFlushSize 2055 @Test(timeout = 60000) 2056 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2057 Configuration conf = HBaseConfiguration.create(); 2058 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2059 2060 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2061 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2062 2063 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2064 2065 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2066 byte[] value = new byte[size + 1]; 2067 2068 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2069 long timestamp = EnvironmentEdgeManager.currentTime(); 2070 long seqId = 100; 2071 ExtendedCell cell = createCell(qf1, timestamp, seqId, value); 2072 int cellByteSize = MutableSegment.getCellLength(cell); 2073 store.add(cell, memStoreSizing); 2074 assertTrue(memStoreSizing.getCellsCount() == 1); 2075 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2076 // Waiting the in memory compaction completed, see HBASE-26438 2077 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2078 } 2079 2080 /** 2081 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2082 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2083 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2084 * segment replace, and we may read wrong data when these chunk reused by others. 2085 */ 2086 @Test 2087 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2088 Configuration conf = HBaseConfiguration.create(); 2089 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2090 2091 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2092 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2093 final long timestamp = EnvironmentEdgeManager.currentTime(); 2094 final long seqId = 100; 2095 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2096 final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2097 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2098 final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2099 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2100 quals.add(qf1); 2101 2102 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2103 int inMemoryFlushByteSize = cellByteSize - 1; 2104 2105 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2106 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2107 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2108 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2109 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2110 2111 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2112 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2113 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2114 2115 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2116 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2117 2118 // Data chunk Pool is disabled. 2119 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2120 2121 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2122 2123 // First compact 2124 store.add(originalCell1, memStoreSizing); 2125 // Waiting for the first in-memory compaction finished 2126 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2127 2128 StoreScanner storeScanner = 2129 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2130 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2131 ExtendedCell resultCell1 = segmentScanner.next(); 2132 assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1)); 2133 int cell1ChunkId = resultCell1.getChunkId(); 2134 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2135 assertNull(segmentScanner.next()); 2136 segmentScanner.close(); 2137 storeScanner.close(); 2138 Segment segment = segmentScanner.segment; 2139 assertTrue(segment instanceof CellChunkImmutableSegment); 2140 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2141 assertTrue(!memStoreLAB1.isClosed()); 2142 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2143 assertTrue(!memStoreLAB1.isReclaimed()); 2144 2145 // Second compact 2146 store.add(originalCell2, memStoreSizing); 2147 // Waiting for the second in-memory compaction finished 2148 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2149 2150 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2151 // must be associated with chunk.. We were looking for a cell at index 0. 2152 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2153 // are recycled after the second in-memory compaction finished,the 2154 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2155 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2156 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2157 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2158 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2159 ExtendedCell newResultCell1 = segmentScanner.next(); 2160 assertTrue(newResultCell1 != resultCell1); 2161 assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1)); 2162 2163 ExtendedCell resultCell2 = segmentScanner.next(); 2164 assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2)); 2165 assertNull(segmentScanner.next()); 2166 segmentScanner.close(); 2167 storeScanner.close(); 2168 2169 segment = segmentScanner.segment; 2170 assertTrue(segment instanceof CellChunkImmutableSegment); 2171 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2172 assertTrue(!memStoreLAB2.isClosed()); 2173 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2174 assertTrue(!memStoreLAB2.isReclaimed()); 2175 assertTrue(memStoreLAB1.isClosed()); 2176 assertTrue(memStoreLAB1.chunks.isEmpty()); 2177 assertTrue(memStoreLAB1.isReclaimed()); 2178 } 2179 2180 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2181 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2182 @Test 2183 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2184 throws IOException, InterruptedException { 2185 doWriteTestLargeCellAndSmallCellConcurrently( 2186 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2187 doWriteTestLargeCellAndSmallCellConcurrently( 2188 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2189 doWriteTestLargeCellAndSmallCellConcurrently( 2190 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2191 doWriteTestLargeCellAndSmallCellConcurrently( 2192 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2193 doWriteTestLargeCellAndSmallCellConcurrently( 2194 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2195 } 2196 2197 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2198 throws IOException, InterruptedException { 2199 2200 Configuration conf = HBaseConfiguration.create(); 2201 2202 byte[] smallValue = new byte[3]; 2203 byte[] largeValue = new byte[100]; 2204 final long timestamp = EnvironmentEdgeManager.currentTime(); 2205 final long seqId = 100; 2206 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2207 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2208 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2209 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2210 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2211 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2212 flushByteSize < (smallCellByteSize + largeCellByteSize); 2213 2214 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2215 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2216 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2217 2218 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2219 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2220 2221 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2222 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2223 myCompactingMemStore.disableCompaction(); 2224 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2225 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2226 } else { 2227 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2228 } 2229 2230 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2231 final AtomicLong totalCellByteSize = new AtomicLong(0); 2232 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2233 Thread smallCellThread = new Thread(() -> { 2234 try { 2235 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2236 long currentTimestamp = timestamp + i; 2237 ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2238 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2239 store.add(cell, memStoreSizing); 2240 } 2241 } catch (Throwable exception) { 2242 exceptionRef.set(exception); 2243 2244 } 2245 }); 2246 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2247 smallCellThread.start(); 2248 2249 String oldThreadName = Thread.currentThread().getName(); 2250 try { 2251 /** 2252 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2253 * </p> 2254 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2255 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2256 * largeCellThread invokes flushInMemory. 2257 * <p/> 2258 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2259 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2260 * <p/> 2261 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2262 * largeCellThread concurrently write one cell and wait each other, and then write another 2263 * cell etc. 2264 */ 2265 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2266 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2267 long currentTimestamp = timestamp + i; 2268 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2269 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2270 store.add(cell, memStoreSizing); 2271 } 2272 smallCellThread.join(); 2273 2274 assertTrue(exceptionRef.get() == null); 2275 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2276 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2277 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2278 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2279 } else { 2280 assertTrue( 2281 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2282 } 2283 } finally { 2284 Thread.currentThread().setName(oldThreadName); 2285 } 2286 } 2287 2288 /** 2289 * <pre> 2290 * This test is for HBASE-26384, 2291 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2292 * execute concurrently. 2293 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2294 * for both branch-2 and master): 2295 * 1. The {@link CompactingMemStore} size exceeds 2296 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2297 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2298 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2299 * 2. The in memory compact thread starts and then stopping before 2300 * {@link CompactingMemStore#flattenOneSegment}. 2301 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2302 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2303 * compact thread continues. 2304 * Assuming {@link VersionedSegmentsList#version} returned from 2305 * {@link CompactingMemStore#getImmutableSegments} is v. 2306 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2307 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2308 * {@link CompactionPipeline#version} is still v. 2309 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2310 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2311 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2312 * {@link CompactionPipeline} has changed because 2313 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2314 * removed in fact and still remaining in {@link CompactionPipeline}. 2315 * 2316 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2317 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2318 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2319 * v+1. 2320 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2321 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2322 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2323 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2324 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2325 * </pre> 2326 */ 2327 @Test 2328 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2329 Configuration conf = HBaseConfiguration.create(); 2330 2331 byte[] smallValue = new byte[3]; 2332 byte[] largeValue = new byte[9]; 2333 final long timestamp = EnvironmentEdgeManager.currentTime(); 2334 final long seqId = 100; 2335 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2336 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2337 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2338 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2339 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2340 int flushByteSize = totalCellByteSize - 2; 2341 2342 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2343 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2344 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2345 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2346 2347 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2348 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2349 2350 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2351 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2352 2353 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2354 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2355 2356 String oldThreadName = Thread.currentThread().getName(); 2357 try { 2358 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2359 /** 2360 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2361 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2362 * would invoke {@link CompactingMemStore#stopCompaction}. 2363 */ 2364 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2365 2366 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2367 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2368 2369 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2370 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2371 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2372 assertTrue(segments.getNumOfSegments() == 0); 2373 assertTrue(segments.getNumOfCells() == 0); 2374 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2375 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2376 } finally { 2377 Thread.currentThread().setName(oldThreadName); 2378 } 2379 } 2380 2381 /** 2382 * <pre> 2383 * This test is for HBASE-26384, 2384 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2385 * and writeMemStore execute concurrently. 2386 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2387 * for both branch-2 and master): 2388 * 1. The {@link CompactingMemStore} size exceeds 2389 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2390 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2391 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2392 * 2. The in memory compact thread starts and then stopping before 2393 * {@link CompactingMemStore#flattenOneSegment}. 2394 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2395 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2396 * compact thread continues. 2397 * Assuming {@link VersionedSegmentsList#version} returned from 2398 * {@link CompactingMemStore#getImmutableSegments} is v. 2399 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2400 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2401 * {@link CompactionPipeline#version} is still v. 2402 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2403 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2404 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2405 * {@link CompactionPipeline} has changed because 2406 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2407 * removed in fact and still remaining in {@link CompactionPipeline}. 2408 * 2409 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2410 * and I add step 7-8 to test there is new segment added before retry. 2411 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2412 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2413 * v+1. 2414 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2415 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2416 * failed and retry,{@link VersionedSegmentsList#version} returned from 2417 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2418 * 7. The write thread continues writing to {@link CompactingMemStore} and 2419 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2420 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2421 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2422 * {@link CompactionPipeline#version} is still v+1. 2423 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2424 * {@link CompactionPipeline#version} is still v+1, 2425 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2426 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2427 * {@link CompactingMemStore#swapPipelineWithNull}. 2428 * </pre> 2429 */ 2430 @Test 2431 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2432 Configuration conf = HBaseConfiguration.create(); 2433 2434 byte[] smallValue = new byte[3]; 2435 byte[] largeValue = new byte[9]; 2436 final long timestamp = EnvironmentEdgeManager.currentTime(); 2437 final long seqId = 100; 2438 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2439 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2440 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2441 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2442 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2443 int flushByteSize = firstWriteCellByteSize - 2; 2444 2445 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2446 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2447 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2448 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2449 2450 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2451 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2452 2453 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2454 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2455 2456 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2457 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2458 2459 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2460 final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2461 final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2462 final int writeAgainCellByteSize = 2463 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2464 final Thread writeAgainThread = new Thread(() -> { 2465 try { 2466 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2467 2468 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2469 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2470 2471 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2472 } catch (Throwable exception) { 2473 exceptionRef.set(exception); 2474 } 2475 }); 2476 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2477 writeAgainThread.start(); 2478 2479 String oldThreadName = Thread.currentThread().getName(); 2480 try { 2481 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2482 /** 2483 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2484 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2485 * would invoke {@link CompactingMemStore#stopCompaction}. 2486 */ 2487 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2488 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2489 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2490 writeAgainThread.join(); 2491 2492 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2493 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2494 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2495 assertTrue(segments.getNumOfSegments() == 1); 2496 assertTrue( 2497 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2498 assertTrue(segments.getNumOfCells() == 2); 2499 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2500 assertTrue(exceptionRef.get() == null); 2501 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2502 } finally { 2503 Thread.currentThread().setName(oldThreadName); 2504 } 2505 } 2506 2507 /** 2508 * <pre> 2509 * This test is for HBASE-26465, 2510 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2511 * concurrently. The threads sequence before HBASE-26465 is: 2512 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2513 * {@link DefaultMemStore}. 2514 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2515 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2516 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2517 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2518 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2519 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2520 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2521 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2522 * are recycled. 2523 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2524 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2525 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2526 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2527 * be overwritten by other write threads,which may cause serious problem. 2528 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2529 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2530 * </pre> 2531 */ 2532 @Test 2533 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2534 Configuration conf = HBaseConfiguration.create(); 2535 2536 byte[] smallValue = new byte[3]; 2537 byte[] largeValue = new byte[9]; 2538 final long timestamp = EnvironmentEdgeManager.currentTime(); 2539 final long seqId = 100; 2540 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2541 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2542 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2543 quals.add(qf1); 2544 quals.add(qf2); 2545 2546 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2547 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2548 2549 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2550 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2551 myDefaultMemStore.store = store; 2552 2553 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2554 store.add(smallCell, memStoreSizing); 2555 store.add(largeCell, memStoreSizing); 2556 2557 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2558 final Thread flushThread = new Thread(() -> { 2559 try { 2560 flushStore(store, id++); 2561 } catch (Throwable exception) { 2562 exceptionRef.set(exception); 2563 } 2564 }); 2565 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2566 flushThread.start(); 2567 2568 String oldThreadName = Thread.currentThread().getName(); 2569 StoreScanner storeScanner = null; 2570 try { 2571 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2572 2573 /** 2574 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2575 */ 2576 myDefaultMemStore.getScannerCyclicBarrier.await(); 2577 2578 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2579 flushThread.join(); 2580 2581 if (myDefaultMemStore.shouldWait) { 2582 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2583 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2584 assertTrue(memStoreLAB.isClosed()); 2585 assertTrue(!memStoreLAB.chunks.isEmpty()); 2586 assertTrue(!memStoreLAB.isReclaimed()); 2587 2588 ExtendedCell cell1 = segmentScanner.next(); 2589 PrivateCellUtil.equals(smallCell, cell1); 2590 ExtendedCell cell2 = segmentScanner.next(); 2591 PrivateCellUtil.equals(largeCell, cell2); 2592 assertNull(segmentScanner.next()); 2593 } else { 2594 List<ExtendedCell> results = new ArrayList<>(); 2595 storeScanner.next(results); 2596 assertEquals(2, results.size()); 2597 PrivateCellUtil.equals(smallCell, results.get(0)); 2598 PrivateCellUtil.equals(largeCell, results.get(1)); 2599 } 2600 assertTrue(exceptionRef.get() == null); 2601 } finally { 2602 if (storeScanner != null) { 2603 storeScanner.close(); 2604 } 2605 Thread.currentThread().setName(oldThreadName); 2606 } 2607 } 2608 2609 @SuppressWarnings("unchecked") 2610 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2611 List<T> resultScanners = new ArrayList<T>(); 2612 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2613 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2614 resultScanners.add((T) keyValueScanner); 2615 } 2616 } 2617 assertTrue(resultScanners.size() == 1); 2618 return resultScanners.get(0); 2619 } 2620 2621 @Test 2622 public void testOnConfigurationChange() throws IOException { 2623 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2624 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2625 final int STORE_MAX_FILES_TO_COMPACT = 6; 2626 2627 // Build a table that its maxFileToCompact different from common configuration. 2628 Configuration conf = HBaseConfiguration.create(); 2629 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2630 COMMON_MAX_FILES_TO_COMPACT); 2631 conf.setBoolean(CACHE_DATA_ON_READ_KEY, false); 2632 conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); 2633 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 2634 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2635 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2636 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2637 .build(); 2638 init(this.name.getMethodName(), conf, hcd); 2639 2640 // After updating common configuration, the conf in HStore itself must not be changed. 2641 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2642 NEW_COMMON_MAX_FILES_TO_COMPACT); 2643 this.store.onConfigurationChange(conf); 2644 2645 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2646 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2647 2648 assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false); 2649 assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true); 2650 assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true); 2651 2652 // reset to default values 2653 conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ); 2654 conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); 2655 conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); 2656 this.store.onConfigurationChange(conf); 2657 } 2658 2659 /** 2660 * This test is for HBASE-26476 2661 */ 2662 @Test 2663 public void testExtendsDefaultMemStore() throws Exception { 2664 Configuration conf = HBaseConfiguration.create(); 2665 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2666 2667 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2668 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2669 tearDown(); 2670 2671 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2672 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2673 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2674 } 2675 2676 static class CustomDefaultMemStore extends DefaultMemStore { 2677 2678 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2679 RegionServicesForStores regionServices) { 2680 super(conf, c, regionServices); 2681 } 2682 2683 } 2684 2685 /** 2686 * This test is for HBASE-26488 2687 */ 2688 @Test 2689 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2690 Configuration conf = HBaseConfiguration.create(); 2691 2692 byte[] smallValue = new byte[3]; 2693 byte[] largeValue = new byte[9]; 2694 final long timestamp = EnvironmentEdgeManager.currentTime(); 2695 final long seqId = 100; 2696 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2697 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2698 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2699 quals.add(qf1); 2700 quals.add(qf2); 2701 2702 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2703 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2704 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2705 MyDefaultStoreFlusher.class.getName()); 2706 2707 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2708 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2709 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2710 2711 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2712 store.add(smallCell, memStoreSizing); 2713 store.add(largeCell, memStoreSizing); 2714 flushStore(store, id++); 2715 2716 MemStoreLABImpl memStoreLAB = 2717 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2718 assertTrue(memStoreLAB.isClosed()); 2719 assertTrue(memStoreLAB.getRefCntValue() == 0); 2720 assertTrue(memStoreLAB.isReclaimed()); 2721 assertTrue(memStoreLAB.chunks.isEmpty()); 2722 StoreScanner storeScanner = null; 2723 try { 2724 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2725 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2726 assertTrue(store.memstore.size().getCellsCount() == 0); 2727 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2728 assertTrue(storeScanner.currentScanners.size() == 1); 2729 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2730 2731 List<ExtendedCell> results = new ArrayList<>(); 2732 storeScanner.next(results); 2733 assertEquals(2, results.size()); 2734 PrivateCellUtil.equals(smallCell, results.get(0)); 2735 PrivateCellUtil.equals(largeCell, results.get(1)); 2736 } finally { 2737 if (storeScanner != null) { 2738 storeScanner.close(); 2739 } 2740 } 2741 } 2742 2743 static class MyDefaultMemStore1 extends DefaultMemStore { 2744 2745 private ImmutableSegment snapshotImmutableSegment; 2746 2747 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2748 RegionServicesForStores regionServices) { 2749 super(conf, c, regionServices); 2750 } 2751 2752 @Override 2753 public MemStoreSnapshot snapshot() { 2754 MemStoreSnapshot result = super.snapshot(); 2755 this.snapshotImmutableSegment = snapshot; 2756 return result; 2757 } 2758 2759 } 2760 2761 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2762 private static final AtomicInteger failCounter = new AtomicInteger(1); 2763 private static final AtomicInteger counter = new AtomicInteger(0); 2764 2765 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2766 super(conf, store); 2767 } 2768 2769 @Override 2770 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2771 MonitoredTask status, ThroughputController throughputController, 2772 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2773 counter.incrementAndGet(); 2774 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2775 writerCreationTracker); 2776 } 2777 2778 @Override 2779 protected void performFlush(InternalScanner scanner, final CellSink sink, 2780 ThroughputController throughputController) throws IOException { 2781 2782 final int currentCount = counter.get(); 2783 CellSink newCellSink = (cell) -> { 2784 if (currentCount <= failCounter.get()) { 2785 throw new IOException("Simulated exception by tests"); 2786 } 2787 sink.append(cell); 2788 }; 2789 super.performFlush(scanner, newCellSink, throughputController); 2790 } 2791 } 2792 2793 /** 2794 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2795 */ 2796 @Test 2797 public void testImmutableMemStoreLABRefCnt() throws Exception { 2798 Configuration conf = HBaseConfiguration.create(); 2799 2800 byte[] smallValue = new byte[3]; 2801 byte[] largeValue = new byte[9]; 2802 final long timestamp = EnvironmentEdgeManager.currentTime(); 2803 final long seqId = 100; 2804 final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2805 final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2806 final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2807 final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2808 final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2809 final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2810 2811 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2812 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2813 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2814 int flushByteSize = firstWriteCellByteSize - 2; 2815 2816 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2817 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2818 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2819 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2820 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2821 2822 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2823 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2824 2825 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2826 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2827 myCompactingMemStore.allowCompaction.set(false); 2828 2829 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2830 store.add(smallCell1, memStoreSizing); 2831 store.add(largeCell1, memStoreSizing); 2832 store.add(smallCell2, memStoreSizing); 2833 store.add(largeCell2, memStoreSizing); 2834 store.add(smallCell3, memStoreSizing); 2835 store.add(largeCell3, memStoreSizing); 2836 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2837 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2838 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2839 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2840 for (ImmutableSegment segment : segments) { 2841 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2842 } 2843 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2844 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2845 assertTrue(memStoreLAB.getRefCntValue() == 2); 2846 } 2847 2848 myCompactingMemStore.allowCompaction.set(true); 2849 myCompactingMemStore.flushInMemory(); 2850 2851 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2852 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2853 ImmutableMemStoreLAB immutableMemStoreLAB = 2854 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2855 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2856 assertTrue(memStoreLAB.getRefCntValue() == 2); 2857 } 2858 2859 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2860 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2861 assertTrue(memStoreLAB.getRefCntValue() == 2); 2862 } 2863 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2864 for (KeyValueScanner scanner : scanners1) { 2865 scanner.close(); 2866 } 2867 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2868 assertTrue(memStoreLAB.getRefCntValue() == 1); 2869 } 2870 for (KeyValueScanner scanner : scanners2) { 2871 scanner.close(); 2872 } 2873 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2874 assertTrue(memStoreLAB.getRefCntValue() == 1); 2875 } 2876 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2877 flushStore(store, id++); 2878 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2879 assertTrue(memStoreLAB.getRefCntValue() == 0); 2880 } 2881 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2882 assertTrue(immutableMemStoreLAB.isClosed()); 2883 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2884 assertTrue(memStoreLAB.isClosed()); 2885 assertTrue(memStoreLAB.isReclaimed()); 2886 assertTrue(memStoreLAB.chunks.isEmpty()); 2887 } 2888 } 2889 2890 private HStoreFile mockStoreFileWithLength(long length) { 2891 HStoreFile sf = mock(HStoreFile.class); 2892 StoreFileReader sfr = mock(StoreFileReader.class); 2893 when(sf.isHFile()).thenReturn(true); 2894 when(sf.getReader()).thenReturn(sfr); 2895 when(sfr.length()).thenReturn(length); 2896 return sf; 2897 } 2898 2899 private static class MyThread extends Thread { 2900 private StoreScanner scanner; 2901 private KeyValueHeap heap; 2902 2903 public MyThread(StoreScanner scanner) { 2904 this.scanner = scanner; 2905 } 2906 2907 public KeyValueHeap getHeap() { 2908 return this.heap; 2909 } 2910 2911 @Override 2912 public void run() { 2913 scanner.trySwitchToStreamRead(); 2914 heap = scanner.heap; 2915 } 2916 } 2917 2918 private static class MyMemStoreCompactor extends MemStoreCompactor { 2919 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2920 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2921 2922 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2923 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2924 super(compactingMemStore, compactionPolicy); 2925 } 2926 2927 @Override 2928 public boolean start() throws IOException { 2929 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2930 if (isFirst) { 2931 try { 2932 START_COMPACTOR_LATCH.await(); 2933 return super.start(); 2934 } catch (InterruptedException ex) { 2935 throw new RuntimeException(ex); 2936 } 2937 } 2938 return super.start(); 2939 } 2940 } 2941 2942 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2943 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2944 2945 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2946 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2947 throws IOException { 2948 super(conf, c, store, regionServices, compactionPolicy); 2949 } 2950 2951 @Override 2952 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2953 throws IllegalArgumentIOException { 2954 return new MyMemStoreCompactor(this, compactionPolicy); 2955 } 2956 2957 @Override 2958 protected boolean setInMemoryCompactionFlag() { 2959 boolean rval = super.setInMemoryCompactionFlag(); 2960 if (rval) { 2961 RUNNER_COUNT.incrementAndGet(); 2962 if (LOG.isDebugEnabled()) { 2963 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2964 } 2965 } 2966 return rval; 2967 } 2968 } 2969 2970 public static class MyCompactingMemStore extends CompactingMemStore { 2971 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2972 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2973 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2974 2975 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2976 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2977 throws IOException { 2978 super(conf, c, store, regionServices, compactionPolicy); 2979 } 2980 2981 @Override 2982 protected List<KeyValueScanner> createList(int capacity) { 2983 if (START_TEST.get()) { 2984 try { 2985 getScannerLatch.countDown(); 2986 snapshotLatch.await(); 2987 } catch (InterruptedException e) { 2988 throw new RuntimeException(e); 2989 } 2990 } 2991 return new ArrayList<>(capacity); 2992 } 2993 2994 @Override 2995 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2996 if (START_TEST.get()) { 2997 try { 2998 getScannerLatch.await(); 2999 } catch (InterruptedException e) { 3000 throw new RuntimeException(e); 3001 } 3002 } 3003 3004 super.pushActiveToPipeline(active, checkEmpty); 3005 if (START_TEST.get()) { 3006 snapshotLatch.countDown(); 3007 } 3008 } 3009 } 3010 3011 interface MyListHook { 3012 void hook(int currentSize); 3013 } 3014 3015 private static class MyList<T> implements List<T> { 3016 private final List<T> delegatee = new ArrayList<>(); 3017 private final MyListHook hookAtAdd; 3018 3019 MyList(final MyListHook hookAtAdd) { 3020 this.hookAtAdd = hookAtAdd; 3021 } 3022 3023 @Override 3024 public int size() { 3025 return delegatee.size(); 3026 } 3027 3028 @Override 3029 public boolean isEmpty() { 3030 return delegatee.isEmpty(); 3031 } 3032 3033 @Override 3034 public boolean contains(Object o) { 3035 return delegatee.contains(o); 3036 } 3037 3038 @Override 3039 public Iterator<T> iterator() { 3040 return delegatee.iterator(); 3041 } 3042 3043 @Override 3044 public Object[] toArray() { 3045 return delegatee.toArray(); 3046 } 3047 3048 @Override 3049 public <R> R[] toArray(R[] a) { 3050 return delegatee.toArray(a); 3051 } 3052 3053 @Override 3054 public boolean add(T e) { 3055 hookAtAdd.hook(size()); 3056 return delegatee.add(e); 3057 } 3058 3059 @Override 3060 public boolean remove(Object o) { 3061 return delegatee.remove(o); 3062 } 3063 3064 @Override 3065 public boolean containsAll(Collection<?> c) { 3066 return delegatee.containsAll(c); 3067 } 3068 3069 @Override 3070 public boolean addAll(Collection<? extends T> c) { 3071 return delegatee.addAll(c); 3072 } 3073 3074 @Override 3075 public boolean addAll(int index, Collection<? extends T> c) { 3076 return delegatee.addAll(index, c); 3077 } 3078 3079 @Override 3080 public boolean removeAll(Collection<?> c) { 3081 return delegatee.removeAll(c); 3082 } 3083 3084 @Override 3085 public boolean retainAll(Collection<?> c) { 3086 return delegatee.retainAll(c); 3087 } 3088 3089 @Override 3090 public void clear() { 3091 delegatee.clear(); 3092 } 3093 3094 @Override 3095 public T get(int index) { 3096 return delegatee.get(index); 3097 } 3098 3099 @Override 3100 public T set(int index, T element) { 3101 return delegatee.set(index, element); 3102 } 3103 3104 @Override 3105 public void add(int index, T element) { 3106 delegatee.add(index, element); 3107 } 3108 3109 @Override 3110 public T remove(int index) { 3111 return delegatee.remove(index); 3112 } 3113 3114 @Override 3115 public int indexOf(Object o) { 3116 return delegatee.indexOf(o); 3117 } 3118 3119 @Override 3120 public int lastIndexOf(Object o) { 3121 return delegatee.lastIndexOf(o); 3122 } 3123 3124 @Override 3125 public ListIterator<T> listIterator() { 3126 return delegatee.listIterator(); 3127 } 3128 3129 @Override 3130 public ListIterator<T> listIterator(int index) { 3131 return delegatee.listIterator(index); 3132 } 3133 3134 @Override 3135 public List<T> subList(int fromIndex, int toIndex) { 3136 return delegatee.subList(fromIndex, toIndex); 3137 } 3138 } 3139 3140 public static class MyCompactingMemStore2 extends CompactingMemStore { 3141 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3142 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3143 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3144 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3145 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3146 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3147 3148 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3149 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3150 throws IOException { 3151 super(conf, cellComparator, store, regionServices, compactionPolicy); 3152 } 3153 3154 @Override 3155 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3156 MemStoreSizing memstoreSizing) { 3157 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3158 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3159 if (currentCount <= 1) { 3160 try { 3161 /** 3162 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3163 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3164 * largeCellThread invokes flushInMemory. 3165 */ 3166 preCyclicBarrier.await(); 3167 } catch (Throwable e) { 3168 throw new RuntimeException(e); 3169 } 3170 } 3171 } 3172 3173 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3174 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3175 try { 3176 preCyclicBarrier.await(); 3177 } catch (Throwable e) { 3178 throw new RuntimeException(e); 3179 } 3180 } 3181 return returnValue; 3182 } 3183 3184 @Override 3185 protected void doAdd(MutableSegment currentActive, ExtendedCell cell, 3186 MemStoreSizing memstoreSizing) { 3187 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3188 try { 3189 /** 3190 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3191 * currentActive . That is to say when largeCellThread called flushInMemory method, 3192 * currentActive has no cell. 3193 */ 3194 postCyclicBarrier.await(); 3195 } catch (Throwable e) { 3196 throw new RuntimeException(e); 3197 } 3198 } 3199 super.doAdd(currentActive, cell, memstoreSizing); 3200 } 3201 3202 @Override 3203 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3204 super.flushInMemory(currentActiveMutableSegment); 3205 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3206 if (largeCellPreUpdateCounter.get() <= 1) { 3207 try { 3208 postCyclicBarrier.await(); 3209 } catch (Throwable e) { 3210 throw new RuntimeException(e); 3211 } 3212 } 3213 } 3214 } 3215 3216 } 3217 3218 public static class MyCompactingMemStore3 extends CompactingMemStore { 3219 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3220 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3221 3222 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3223 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3224 private final AtomicInteger flushCounter = new AtomicInteger(0); 3225 private static final int CELL_COUNT = 5; 3226 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3227 3228 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3229 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3230 throws IOException { 3231 super(conf, cellComparator, store, regionServices, compactionPolicy); 3232 } 3233 3234 @Override 3235 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3236 MemStoreSizing memstoreSizing) { 3237 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3238 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3239 } 3240 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3241 try { 3242 preCyclicBarrier.await(); 3243 } catch (Throwable e) { 3244 throw new RuntimeException(e); 3245 } 3246 } 3247 3248 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3249 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3250 try { 3251 preCyclicBarrier.await(); 3252 } catch (Throwable e) { 3253 throw new RuntimeException(e); 3254 } 3255 } 3256 return returnValue; 3257 } 3258 3259 @Override 3260 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3261 super.postUpdate(currentActiveMutableSegment); 3262 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3263 try { 3264 postCyclicBarrier.await(); 3265 } catch (Throwable e) { 3266 throw new RuntimeException(e); 3267 } 3268 return; 3269 } 3270 3271 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3272 try { 3273 postCyclicBarrier.await(); 3274 } catch (Throwable e) { 3275 throw new RuntimeException(e); 3276 } 3277 } 3278 } 3279 3280 @Override 3281 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3282 super.flushInMemory(currentActiveMutableSegment); 3283 flushCounter.incrementAndGet(); 3284 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3285 return; 3286 } 3287 3288 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3289 try { 3290 postCyclicBarrier.await(); 3291 } catch (Throwable e) { 3292 throw new RuntimeException(e); 3293 } 3294 3295 } 3296 3297 void disableCompaction() { 3298 allowCompaction.set(false); 3299 } 3300 3301 void enableCompaction() { 3302 allowCompaction.set(true); 3303 } 3304 3305 } 3306 3307 public static class MyCompactingMemStore4 extends CompactingMemStore { 3308 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3309 /** 3310 * {@link CompactingMemStore#flattenOneSegment} must execute after 3311 * {@link CompactingMemStore#getImmutableSegments} 3312 */ 3313 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3314 /** 3315 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3316 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3317 */ 3318 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3319 /** 3320 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3321 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3322 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3323 */ 3324 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3325 /** 3326 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3327 */ 3328 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3329 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3330 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3331 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3332 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3333 3334 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3335 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3336 throws IOException { 3337 super(conf, cellComparator, store, regionServices, compactionPolicy); 3338 } 3339 3340 @Override 3341 public VersionedSegmentsList getImmutableSegments() { 3342 VersionedSegmentsList result = super.getImmutableSegments(); 3343 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3344 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3345 if (currentCount <= 1) { 3346 try { 3347 flattenOneSegmentPreCyclicBarrier.await(); 3348 } catch (Throwable e) { 3349 throw new RuntimeException(e); 3350 } 3351 } 3352 } 3353 return result; 3354 } 3355 3356 @Override 3357 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3358 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3359 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3360 if (currentCount <= 1) { 3361 try { 3362 flattenOneSegmentPostCyclicBarrier.await(); 3363 } catch (Throwable e) { 3364 throw new RuntimeException(e); 3365 } 3366 } 3367 } 3368 boolean result = super.swapPipelineWithNull(segments); 3369 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3370 int currentCount = swapPipelineWithNullCounter.get(); 3371 if (currentCount <= 1) { 3372 assertTrue(!result); 3373 } 3374 if (currentCount == 2) { 3375 assertTrue(result); 3376 } 3377 } 3378 return result; 3379 3380 } 3381 3382 @Override 3383 public void flattenOneSegment(long requesterVersion, Action action) { 3384 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3385 if (currentCount <= 1) { 3386 try { 3387 /** 3388 * {@link CompactingMemStore#snapshot} could start. 3389 */ 3390 snapShotStartCyclicCyclicBarrier.await(); 3391 flattenOneSegmentPreCyclicBarrier.await(); 3392 } catch (Throwable e) { 3393 throw new RuntimeException(e); 3394 } 3395 } 3396 super.flattenOneSegment(requesterVersion, action); 3397 if (currentCount <= 1) { 3398 try { 3399 flattenOneSegmentPostCyclicBarrier.await(); 3400 } catch (Throwable e) { 3401 throw new RuntimeException(e); 3402 } 3403 } 3404 } 3405 3406 @Override 3407 protected boolean setInMemoryCompactionFlag() { 3408 boolean result = super.setInMemoryCompactionFlag(); 3409 assertTrue(result); 3410 setInMemoryCompactionFlagCounter.incrementAndGet(); 3411 return result; 3412 } 3413 3414 @Override 3415 void inMemoryCompaction() { 3416 try { 3417 super.inMemoryCompaction(); 3418 } finally { 3419 try { 3420 inMemoryCompactionEndCyclicBarrier.await(); 3421 } catch (Throwable e) { 3422 throw new RuntimeException(e); 3423 } 3424 3425 } 3426 } 3427 3428 } 3429 3430 public static class MyCompactingMemStore5 extends CompactingMemStore { 3431 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3432 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3433 /** 3434 * {@link CompactingMemStore#flattenOneSegment} must execute after 3435 * {@link CompactingMemStore#getImmutableSegments} 3436 */ 3437 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3438 /** 3439 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3440 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3441 */ 3442 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3443 /** 3444 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3445 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3446 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3447 */ 3448 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3449 /** 3450 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3451 */ 3452 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3453 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3454 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3455 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3456 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3457 /** 3458 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3459 * thread could start. 3460 */ 3461 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3462 /** 3463 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3464 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3465 * execute,and in memory compact thread would exit,because we expect that in memory compact 3466 * executing only once. 3467 */ 3468 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3469 3470 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3471 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3472 throws IOException { 3473 super(conf, cellComparator, store, regionServices, compactionPolicy); 3474 } 3475 3476 @Override 3477 public VersionedSegmentsList getImmutableSegments() { 3478 VersionedSegmentsList result = super.getImmutableSegments(); 3479 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3480 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3481 if (currentCount <= 1) { 3482 try { 3483 flattenOneSegmentPreCyclicBarrier.await(); 3484 } catch (Throwable e) { 3485 throw new RuntimeException(e); 3486 } 3487 } 3488 3489 } 3490 3491 return result; 3492 } 3493 3494 @Override 3495 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3496 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3497 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3498 if (currentCount <= 1) { 3499 try { 3500 flattenOneSegmentPostCyclicBarrier.await(); 3501 } catch (Throwable e) { 3502 throw new RuntimeException(e); 3503 } 3504 } 3505 3506 if (currentCount == 2) { 3507 try { 3508 /** 3509 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3510 * writeAgain thread could start. 3511 */ 3512 writeMemStoreAgainStartCyclicBarrier.await(); 3513 /** 3514 * Only the writeAgain thread completes, retry 3515 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3516 */ 3517 writeMemStoreAgainEndCyclicBarrier.await(); 3518 } catch (Throwable e) { 3519 throw new RuntimeException(e); 3520 } 3521 } 3522 3523 } 3524 boolean result = super.swapPipelineWithNull(segments); 3525 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3526 int currentCount = swapPipelineWithNullCounter.get(); 3527 if (currentCount <= 1) { 3528 assertTrue(!result); 3529 } 3530 if (currentCount == 2) { 3531 assertTrue(result); 3532 } 3533 } 3534 return result; 3535 3536 } 3537 3538 @Override 3539 public void flattenOneSegment(long requesterVersion, Action action) { 3540 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3541 if (currentCount <= 1) { 3542 try { 3543 /** 3544 * {@link CompactingMemStore#snapshot} could start. 3545 */ 3546 snapShotStartCyclicCyclicBarrier.await(); 3547 flattenOneSegmentPreCyclicBarrier.await(); 3548 } catch (Throwable e) { 3549 throw new RuntimeException(e); 3550 } 3551 } 3552 super.flattenOneSegment(requesterVersion, action); 3553 if (currentCount <= 1) { 3554 try { 3555 flattenOneSegmentPostCyclicBarrier.await(); 3556 /** 3557 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3558 * expect that in memory compact executing only once. 3559 */ 3560 writeMemStoreAgainEndCyclicBarrier.await(); 3561 } catch (Throwable e) { 3562 throw new RuntimeException(e); 3563 } 3564 3565 } 3566 } 3567 3568 @Override 3569 protected boolean setInMemoryCompactionFlag() { 3570 boolean result = super.setInMemoryCompactionFlag(); 3571 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3572 if (count <= 1) { 3573 assertTrue(result); 3574 } 3575 if (count == 2) { 3576 assertTrue(!result); 3577 } 3578 return result; 3579 } 3580 3581 @Override 3582 void inMemoryCompaction() { 3583 try { 3584 super.inMemoryCompaction(); 3585 } finally { 3586 try { 3587 inMemoryCompactionEndCyclicBarrier.await(); 3588 } catch (Throwable e) { 3589 throw new RuntimeException(e); 3590 } 3591 3592 } 3593 } 3594 } 3595 3596 public static class MyCompactingMemStore6 extends CompactingMemStore { 3597 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3598 3599 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3600 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3601 throws IOException { 3602 super(conf, cellComparator, store, regionServices, compactionPolicy); 3603 } 3604 3605 @Override 3606 void inMemoryCompaction() { 3607 try { 3608 super.inMemoryCompaction(); 3609 } finally { 3610 try { 3611 inMemoryCompactionEndCyclicBarrier.await(); 3612 } catch (Throwable e) { 3613 throw new RuntimeException(e); 3614 } 3615 3616 } 3617 } 3618 } 3619 3620 public static class MyDefaultMemStore extends DefaultMemStore { 3621 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3622 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3623 /** 3624 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3625 * could start. 3626 */ 3627 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3628 /** 3629 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3630 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3631 */ 3632 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3633 /** 3634 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3635 * completed, {@link DefaultMemStore#getScanners} could continue. 3636 */ 3637 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3638 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3639 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3640 private volatile boolean shouldWait = true; 3641 private volatile HStore store = null; 3642 3643 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3644 RegionServicesForStores regionServices) throws IOException { 3645 super(conf, cellComparator, regionServices); 3646 } 3647 3648 @Override 3649 protected List<Segment> getSnapshotSegments() { 3650 3651 List<Segment> result = super.getSnapshotSegments(); 3652 3653 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3654 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3655 if (currentCount == 1) { 3656 if (this.shouldWait) { 3657 try { 3658 /** 3659 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3660 * {@link DefaultMemStore#doClearSnapShot} could continue. 3661 */ 3662 preClearSnapShotCyclicBarrier.await(); 3663 /** 3664 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3665 */ 3666 postClearSnapShotCyclicBarrier.await(); 3667 3668 } catch (Throwable e) { 3669 throw new RuntimeException(e); 3670 } 3671 } 3672 } 3673 } 3674 return result; 3675 } 3676 3677 @Override 3678 protected void doClearSnapShot() { 3679 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3680 int currentCount = clearSnapshotCounter.incrementAndGet(); 3681 if (currentCount == 1) { 3682 try { 3683 if ( 3684 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3685 .isWriteLockedByCurrentThread() 3686 ) { 3687 shouldWait = false; 3688 } 3689 /** 3690 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3691 * thread could start. 3692 */ 3693 getScannerCyclicBarrier.await(); 3694 3695 if (shouldWait) { 3696 /** 3697 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3698 */ 3699 preClearSnapShotCyclicBarrier.await(); 3700 } 3701 } catch (Throwable e) { 3702 throw new RuntimeException(e); 3703 } 3704 } 3705 } 3706 super.doClearSnapShot(); 3707 3708 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3709 int currentCount = clearSnapshotCounter.get(); 3710 if (currentCount == 1) { 3711 if (shouldWait) { 3712 try { 3713 /** 3714 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3715 * {@link DefaultMemStore#getScanners} could continue. 3716 */ 3717 postClearSnapShotCyclicBarrier.await(); 3718 } catch (Throwable e) { 3719 throw new RuntimeException(e); 3720 } 3721 } 3722 } 3723 } 3724 } 3725 } 3726}