001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.Assert.assertArrayEquals;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import org.apache.hadoop.conf.Configuration;
071import org.apache.hadoop.fs.FSDataOutputStream;
072import org.apache.hadoop.fs.FileStatus;
073import org.apache.hadoop.fs.FileSystem;
074import org.apache.hadoop.fs.FilterFileSystem;
075import org.apache.hadoop.fs.LocalFileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.fs.permission.FsPermission;
078import org.apache.hadoop.hbase.Cell;
079import org.apache.hadoop.hbase.CellBuilderType;
080import org.apache.hadoop.hbase.CellComparator;
081import org.apache.hadoop.hbase.CellComparatorImpl;
082import org.apache.hadoop.hbase.CellUtil;
083import org.apache.hadoop.hbase.ExtendedCell;
084import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
085import org.apache.hadoop.hbase.HBaseClassTestRule;
086import org.apache.hadoop.hbase.HBaseConfiguration;
087import org.apache.hadoop.hbase.HBaseTestingUtil;
088import org.apache.hadoop.hbase.HConstants;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.MemoryCompactionPolicy;
091import org.apache.hadoop.hbase.NamespaceDescriptor;
092import org.apache.hadoop.hbase.PrivateCellUtil;
093import org.apache.hadoop.hbase.TableName;
094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
096import org.apache.hadoop.hbase.client.Get;
097import org.apache.hadoop.hbase.client.RegionInfo;
098import org.apache.hadoop.hbase.client.RegionInfoBuilder;
099import org.apache.hadoop.hbase.client.Scan;
100import org.apache.hadoop.hbase.client.Scan.ReadType;
101import org.apache.hadoop.hbase.client.TableDescriptor;
102import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
104import org.apache.hadoop.hbase.filter.Filter;
105import org.apache.hadoop.hbase.filter.FilterBase;
106import org.apache.hadoop.hbase.io.compress.Compression;
107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
108import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109import org.apache.hadoop.hbase.io.hfile.HFile;
110import org.apache.hadoop.hbase.io.hfile.HFileContext;
111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
112import org.apache.hadoop.hbase.monitoring.MonitoredTask;
113import org.apache.hadoop.hbase.nio.RefCnt;
114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
122import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
124import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
125import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
126import org.apache.hadoop.hbase.security.User;
127import org.apache.hadoop.hbase.testclassification.MediumTests;
128import org.apache.hadoop.hbase.testclassification.RegionServerTests;
129import org.apache.hadoop.hbase.util.BloomFilterUtil;
130import org.apache.hadoop.hbase.util.Bytes;
131import org.apache.hadoop.hbase.util.CommonFSUtils;
132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
134import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
135import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
136import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
137import org.apache.hadoop.hbase.wal.WALFactory;
138import org.apache.hadoop.util.Progressable;
139import org.junit.After;
140import org.junit.AfterClass;
141import org.junit.Before;
142import org.junit.ClassRule;
143import org.junit.Rule;
144import org.junit.Test;
145import org.junit.experimental.categories.Category;
146import org.junit.rules.TestName;
147import org.mockito.Mockito;
148import org.slf4j.Logger;
149import org.slf4j.LoggerFactory;
150
151import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
152
153/**
154 * Test class for the HStore
155 */
156@Category({ RegionServerTests.class, MediumTests.class })
157public class TestHStore {
158
159  @ClassRule
160  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
161
162  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
163  @Rule
164  public TestName name = new TestName();
165
166  HRegion region;
167  HStore store;
168  byte[] table = Bytes.toBytes("table");
169  byte[] family = Bytes.toBytes("family");
170
171  byte[] row = Bytes.toBytes("row");
172  byte[] row2 = Bytes.toBytes("row2");
173  byte[] qf1 = Bytes.toBytes("qf1");
174  byte[] qf2 = Bytes.toBytes("qf2");
175  byte[] qf3 = Bytes.toBytes("qf3");
176  byte[] qf4 = Bytes.toBytes("qf4");
177  byte[] qf5 = Bytes.toBytes("qf5");
178  byte[] qf6 = Bytes.toBytes("qf6");
179
180  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
181
182  List<Cell> expected = new ArrayList<>();
183  List<Cell> result = new ArrayList<>();
184
185  long id = EnvironmentEdgeManager.currentTime();
186  Get get = new Get(row);
187
188  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
189  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
190
191  @Before
192  public void setUp() throws IOException {
193    qualifiers.clear();
194    qualifiers.add(qf1);
195    qualifiers.add(qf3);
196    qualifiers.add(qf5);
197
198    Iterator<byte[]> iter = qualifiers.iterator();
199    while (iter.hasNext()) {
200      byte[] next = iter.next();
201      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
202      get.addColumn(family, next);
203    }
204  }
205
206  private void init(String methodName) throws IOException {
207    init(methodName, TEST_UTIL.getConfiguration());
208  }
209
210  private HStore init(String methodName, Configuration conf) throws IOException {
211    // some of the tests write 4 versions and then flush
212    // (with HBASE-4241, lower versions are collected on flush)
213    return init(methodName, conf,
214      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
215  }
216
217  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
218    throws IOException {
219    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
220  }
221
222  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
223    ColumnFamilyDescriptor hcd) throws IOException {
224    return init(methodName, conf, builder, hcd, null);
225  }
226
227  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
228    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
229    return init(methodName, conf, builder, hcd, hook, false);
230  }
231
232  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
233    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
234    TableDescriptor htd = builder.setColumnFamily(hcd).build();
235    Path basedir = new Path(DIR + methodName);
236    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
237    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
238
239    FileSystem fs = FileSystem.get(conf);
240
241    fs.delete(logdir, true);
242    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
243      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
244      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
245    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
246    Configuration walConf = new Configuration(conf);
247    CommonFSUtils.setRootDir(walConf, basedir);
248    WALFactory wals = new WALFactory(walConf, methodName);
249    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
250      htd, null);
251    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
252    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
253    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
254  }
255
256  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
257    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
258    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
259    if (hook == null) {
260      store = new HStore(region, hcd, conf, false);
261    } else {
262      store = new MyStore(region, hcd, conf, hook, switchToPread);
263    }
264    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
265    return store;
266  }
267
268  /**
269   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
270   */
271  @Test
272  public void testFlushSizeSizing() throws Exception {
273    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
274    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
275    // Only retry once.
276    conf.setInt("hbase.hstore.flush.retries.number", 1);
277    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
278    // Inject our faulty LocalFileSystem
279    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
280    user.runAs(new PrivilegedExceptionAction<Object>() {
281      @Override
282      public Object run() throws Exception {
283        // Make sure it worked (above is sensitive to caching details in hadoop core)
284        FileSystem fs = FileSystem.get(conf);
285        assertEquals(FaultyFileSystem.class, fs.getClass());
286        FaultyFileSystem ffs = (FaultyFileSystem) fs;
287
288        // Initialize region
289        init(name.getMethodName(), conf);
290
291        MemStoreSize mss = store.memstore.getFlushableSize();
292        assertEquals(0, mss.getDataSize());
293        LOG.info("Adding some data");
294        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
295        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
296        // add the heap size of active (mutable) segment
297        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
298        mss = store.memstore.getFlushableSize();
299        assertEquals(kvSize.getMemStoreSize(), mss);
300        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
301        try {
302          LOG.info("Flushing");
303          flushStore(store, id++);
304          fail("Didn't bubble up IOE!");
305        } catch (IOException ioe) {
306          assertTrue(ioe.getMessage().contains("Fault injected"));
307        }
308        // due to snapshot, change mutable to immutable segment
309        kvSize.incMemStoreSize(0,
310          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
311        mss = store.memstore.getFlushableSize();
312        assertEquals(kvSize.getMemStoreSize(), mss);
313        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
314        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
315        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
316        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
317        // not yet cleared the snapshot -- the above flush failed.
318        assertEquals(kvSize.getMemStoreSize(), mss);
319        ffs.fault.set(false);
320        flushStore(store, id++);
321        mss = store.memstore.getFlushableSize();
322        // Size should be the foreground kv size.
323        assertEquals(kvSize2.getMemStoreSize(), mss);
324        flushStore(store, id++);
325        mss = store.memstore.getFlushableSize();
326        assertEquals(0, mss.getDataSize());
327        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
328        return null;
329      }
330    });
331  }
332
333  @Test
334  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
335    int numStoreFiles = 5;
336    writeAndRead(BloomType.ROWCOL, numStoreFiles);
337
338    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
339    // hard to know exactly the numbers here, we are just trying to
340    // prove that they are incrementing
341    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
342    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
343  }
344
345  @Test
346  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
347    int numStoreFiles = 5;
348    writeAndRead(BloomType.ROWCOL, numStoreFiles);
349
350    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
351    // hard to know exactly the numbers here, we are just trying to
352    // prove that they are incrementing
353    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
354    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
355  }
356
357  @Test
358  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
359    int numStoreFiles = 5;
360    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
361
362    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
363    // hard to know exactly the numbers here, we are just trying to
364    // prove that they are incrementing
365    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
366  }
367
368  @Test
369  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
370    int numStoreFiles = 5;
371    writeAndRead(BloomType.NONE, numStoreFiles);
372
373    assertEquals(0, store.getBloomFilterRequestsCount());
374    assertEquals(0, store.getBloomFilterNegativeResultsCount());
375
376    // hard to know exactly the numbers here, we are just trying to
377    // prove that they are incrementing
378    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
379  }
380
381  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
382    Configuration conf = HBaseConfiguration.create();
383    FileSystem fs = FileSystem.get(conf);
384
385    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
386      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
387      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
388    init(name.getMethodName(), conf, hcd);
389
390    for (int i = 1; i <= numStoreFiles; i++) {
391      byte[] row = Bytes.toBytes("row" + i);
392      LOG.info("Adding some data for the store file #" + i);
393      long timeStamp = EnvironmentEdgeManager.currentTime();
394      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
395      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
396      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
397      flush(i);
398    }
399
400    // Verify the total number of store files
401    assertEquals(numStoreFiles, this.store.getStorefiles().size());
402
403    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
404    columns.add(qf1);
405
406    for (int i = 1; i <= numStoreFiles; i++) {
407      KeyValueScanner scanner =
408        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
409      scanner.peek();
410    }
411  }
412
413  /**
414   * Verify that compression and data block encoding are respected by the createWriter method, used
415   * on store flush.
416   */
417  @Test
418  public void testCreateWriter() throws Exception {
419    Configuration conf = HBaseConfiguration.create();
420    FileSystem fs = FileSystem.get(conf);
421
422    ColumnFamilyDescriptor hcd =
423      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
424        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
425    init(name.getMethodName(), conf, hcd);
426
427    // Test createWriter
428    StoreFileWriter writer = store.getStoreEngine()
429      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
430        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
431        .includesTag(false).shouldDropBehind(false));
432    Path path = writer.getPath();
433    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
434    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
435    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
436    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
437    writer.close();
438
439    // Verify that compression and encoding settings are respected
440    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
441    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
442    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
443    reader.close();
444  }
445
446  @Test
447  public void testDeleteExpiredStoreFiles() throws Exception {
448    testDeleteExpiredStoreFiles(0);
449    testDeleteExpiredStoreFiles(1);
450  }
451
452  /**
453   * @param minVersions the MIN_VERSIONS for the column family
454   */
455  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
456    int storeFileNum = 4;
457    int ttl = 4;
458    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
459    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
460
461    Configuration conf = HBaseConfiguration.create();
462    // Enable the expired store file deletion
463    conf.setBoolean("hbase.store.delete.expired.storefile", true);
464    // Set the compaction threshold higher to avoid normal compactions.
465    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
466
467    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
468      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
469
470    long storeTtl = this.store.getScanInfo().getTtl();
471    long sleepTime = storeTtl / storeFileNum;
472    long timeStamp;
473    // There are 4 store files and the max time stamp difference among these
474    // store files will be (this.store.ttl / storeFileNum)
475    for (int i = 1; i <= storeFileNum; i++) {
476      LOG.info("Adding some data for the store file #" + i);
477      timeStamp = EnvironmentEdgeManager.currentTime();
478      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
479      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
480      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
481      flush(i);
482      edge.incrementTime(sleepTime);
483    }
484
485    // Verify the total number of store files
486    assertEquals(storeFileNum, this.store.getStorefiles().size());
487
488    // Each call will find one expired store file and delete it before compaction happens.
489    // There will be no compaction due to threshold above. Last file will not be replaced.
490    for (int i = 1; i <= storeFileNum - 1; i++) {
491      // verify the expired store file.
492      assertFalse(this.store.requestCompaction().isPresent());
493      Collection<HStoreFile> sfs = this.store.getStorefiles();
494      // Ensure i files are gone.
495      if (minVersions == 0) {
496        assertEquals(storeFileNum - i, sfs.size());
497        // Ensure only non-expired files remain.
498        for (HStoreFile sf : sfs) {
499          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
500        }
501      } else {
502        assertEquals(storeFileNum, sfs.size());
503      }
504      // Let the next store file expired.
505      edge.incrementTime(sleepTime);
506    }
507    assertFalse(this.store.requestCompaction().isPresent());
508
509    Collection<HStoreFile> sfs = this.store.getStorefiles();
510    // Assert the last expired file is not removed.
511    if (minVersions == 0) {
512      assertEquals(1, sfs.size());
513    }
514    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
515    assertTrue(ts < (edge.currentTime() - storeTtl));
516
517    for (HStoreFile sf : sfs) {
518      sf.closeStoreFile(true);
519    }
520  }
521
522  @Test
523  public void testLowestModificationTime() throws Exception {
524    Configuration conf = HBaseConfiguration.create();
525    FileSystem fs = FileSystem.get(conf);
526    // Initialize region
527    init(name.getMethodName(), conf);
528
529    int storeFileNum = 4;
530    for (int i = 1; i <= storeFileNum; i++) {
531      LOG.info("Adding some data for the store file #" + i);
532      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
533      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
534      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
535      flush(i);
536    }
537    // after flush; check the lowest time stamp
538    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
539    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
540    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
541
542    // after compact; check the lowest time stamp
543    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
544    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
545    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
546    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
547  }
548
549  private static long getLowestTimeStampFromFS(FileSystem fs,
550    final Collection<HStoreFile> candidates) throws IOException {
551    long minTs = Long.MAX_VALUE;
552    if (candidates.isEmpty()) {
553      return minTs;
554    }
555    Path[] p = new Path[candidates.size()];
556    int i = 0;
557    for (HStoreFile sf : candidates) {
558      p[i] = sf.getPath();
559      ++i;
560    }
561
562    FileStatus[] stats = fs.listStatus(p);
563    if (stats == null || stats.length == 0) {
564      return minTs;
565    }
566    for (FileStatus s : stats) {
567      minTs = Math.min(minTs, s.getModificationTime());
568    }
569    return minTs;
570  }
571
572  //////////////////////////////////////////////////////////////////////////////
573  // Get tests
574  //////////////////////////////////////////////////////////////////////////////
575
576  private static final int BLOCKSIZE_SMALL = 8192;
577
578  /**
579   * Test for hbase-1686.
580   */
581  @Test
582  public void testEmptyStoreFile() throws IOException {
583    init(this.name.getMethodName());
584    // Write a store file.
585    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
586    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
587    flush(1);
588    // Now put in place an empty store file. Its a little tricky. Have to
589    // do manually with hacked in sequence id.
590    HStoreFile f = this.store.getStorefiles().iterator().next();
591    Path storedir = f.getPath().getParent();
592    long seqid = f.getMaxSequenceId();
593    Configuration c = HBaseConfiguration.create();
594    FileSystem fs = FileSystem.get(c);
595    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
596    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
597      .withOutputDir(storedir).withFileContext(meta).build();
598    w.appendMetadata(seqid + 1, false);
599    w.close();
600    this.store.close();
601    // Reopen it... should pick up two files
602    this.store =
603      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
604    assertEquals(2, this.store.getStorefilesCount());
605
606    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
607    assertEquals(1, result.size());
608  }
609
610  /**
611   * Getting data from memstore only
612   */
613  @Test
614  public void testGet_FromMemStoreOnly() throws IOException {
615    init(this.name.getMethodName());
616
617    // Put data in memstore
618    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
619    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
622    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
623    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
624
625    // Get
626    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
627
628    // Compare
629    assertCheck();
630  }
631
632  @Test
633  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
634    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
635    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
636    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
637  }
638
639  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
640    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
641      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
642    long currentTs = 100;
643    long minTs = currentTs;
644    // the extra cell won't be flushed to disk,
645    // so the min of timerange will be different between memStore and hfile.
646    for (int i = 0; i != (maxVersion + 1); ++i) {
647      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
648      if (i == 1) {
649        minTs = currentTs;
650      }
651    }
652    flushStore(store, id++);
653
654    Collection<HStoreFile> files = store.getStorefiles();
655    assertEquals(1, files.size());
656    HStoreFile f = files.iterator().next();
657    f.initReader();
658    StoreFileReader reader = f.getReader();
659    assertEquals(minTs, reader.timeRange.getMin());
660    assertEquals(currentTs, reader.timeRange.getMax());
661  }
662
663  /**
664   * Getting data from files only
665   */
666  @Test
667  public void testGet_FromFilesOnly() throws IOException {
668    init(this.name.getMethodName());
669
670    // Put data in memstore
671    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
672    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
673    // flush
674    flush(1);
675
676    // Add more data
677    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
678    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
679    // flush
680    flush(2);
681
682    // Add more data
683    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
684    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
685    // flush
686    flush(3);
687
688    // Get
689    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
690    // this.store.get(get, qualifiers, result);
691
692    // Need to sort the result since multiple files
693    Collections.sort(result, CellComparatorImpl.COMPARATOR);
694
695    // Compare
696    assertCheck();
697  }
698
699  /**
700   * Getting data from memstore and files
701   */
702  @Test
703  public void testGet_FromMemStoreAndFiles() throws IOException {
704    init(this.name.getMethodName());
705
706    // Put data in memstore
707    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
708    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
709    // flush
710    flush(1);
711
712    // Add more data
713    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
714    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
715    // flush
716    flush(2);
717
718    // Add more data
719    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
720    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
721
722    // Get
723    result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers);
724
725    // Need to sort the result since multiple files
726    Collections.sort(result, CellComparatorImpl.COMPARATOR);
727
728    // Compare
729    assertCheck();
730  }
731
732  private void flush(int storeFilessize) throws IOException {
733    flushStore(store, id++);
734    assertEquals(storeFilessize, this.store.getStorefiles().size());
735    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
736  }
737
738  private void assertCheck() {
739    assertEquals(expected.size(), result.size());
740    for (int i = 0; i < expected.size(); i++) {
741      assertEquals(expected.get(i), result.get(i));
742    }
743  }
744
745  @After
746  public void tearDown() throws Exception {
747    EnvironmentEdgeManagerTestHelper.reset();
748    if (store != null) {
749      try {
750        store.close();
751      } catch (IOException e) {
752      }
753      store = null;
754    }
755    if (region != null) {
756      region.close();
757      region = null;
758    }
759  }
760
761  @AfterClass
762  public static void tearDownAfterClass() throws IOException {
763    TEST_UTIL.cleanupTestDir();
764  }
765
766  @Test
767  public void testHandleErrorsInFlush() throws Exception {
768    LOG.info("Setting up a faulty file system that cannot write");
769
770    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
771    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
772    // Inject our faulty LocalFileSystem
773    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
774    user.runAs(new PrivilegedExceptionAction<Object>() {
775      @Override
776      public Object run() throws Exception {
777        // Make sure it worked (above is sensitive to caching details in hadoop core)
778        FileSystem fs = FileSystem.get(conf);
779        assertEquals(FaultyFileSystem.class, fs.getClass());
780
781        // Initialize region
782        init(name.getMethodName(), conf);
783
784        LOG.info("Adding some data");
785        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
786        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
787        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
788
789        LOG.info("Before flush, we should have no files");
790
791        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext());
792        Collection<StoreFileInfo> files = sft.load();
793        assertEquals(0, files != null ? files.size() : 0);
794
795        // flush
796        try {
797          LOG.info("Flushing");
798          flush(1);
799          fail("Didn't bubble up IOE!");
800        } catch (IOException ioe) {
801          assertTrue(ioe.getMessage().contains("Fault injected"));
802        }
803
804        LOG.info("After failed flush, we should still have no files!");
805        files = sft.load();
806        assertEquals(0, files != null ? files.size() : 0);
807        store.getHRegion().getWAL().close();
808        return null;
809      }
810    });
811    FileSystem.closeAllForUGI(user.getUGI());
812  }
813
814  /**
815   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
816   * thereafter it will succeed. Used by {@link TestHRegion} too.
817   */
818  static class FaultyFileSystem extends FilterFileSystem {
819    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
820    private long faultPos = 200;
821    AtomicBoolean fault = new AtomicBoolean(true);
822
823    public FaultyFileSystem() {
824      super(new LocalFileSystem());
825      LOG.info("Creating faulty!");
826    }
827
828    @Override
829    public FSDataOutputStream create(Path p) throws IOException {
830      return new FaultyOutputStream(super.create(p), faultPos, fault);
831    }
832
833    @Override
834    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
835      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
836      return new FaultyOutputStream(
837        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
838        faultPos, fault);
839    }
840
841    @Override
842    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
843      short replication, long blockSize, Progressable progress) throws IOException {
844      // Fake it. Call create instead. The default implementation throws an IOE
845      // that this is not supported.
846      return create(f, overwrite, bufferSize, replication, blockSize, progress);
847    }
848  }
849
850  static class FaultyOutputStream extends FSDataOutputStream {
851    volatile long faultPos = Long.MAX_VALUE;
852    private final AtomicBoolean fault;
853
854    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
855      throws IOException {
856      super(out, null);
857      this.faultPos = faultPos;
858      this.fault = fault;
859    }
860
861    @Override
862    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
863      LOG.info("faulty stream write at pos " + getPos());
864      injectFault();
865      super.write(buf, offset, length);
866    }
867
868    private void injectFault() throws IOException {
869      if (this.fault.get() && getPos() >= faultPos) {
870        throw new IOException("Fault injected");
871      }
872    }
873  }
874
875  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
876    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
877    storeFlushCtx.prepare();
878    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
879    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
880    return storeFlushCtx;
881  }
882
883  /**
884   * Generate a list of KeyValues for testing based on given parameters
885   * @return the rows key-value list
886   */
887  private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
888    byte[] family) {
889    List<ExtendedCell> kvList = new ArrayList<>();
890    for (int i = 1; i <= numRows; i++) {
891      byte[] b = Bytes.toBytes(i);
892      for (long timestamp : timestamps) {
893        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
894      }
895    }
896    return kvList;
897  }
898
899  /**
900   * Test to ensure correctness when using Stores with multiple timestamps
901   */
902  @Test
903  public void testMultipleTimestamps() throws IOException {
904    int numRows = 1;
905    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
906    long[] timestamps2 = new long[] { 30, 80 };
907
908    init(this.name.getMethodName());
909
910    List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
911    for (ExtendedCell kv : kvList1) {
912      this.store.add(kv, null);
913    }
914
915    flushStore(store, id++);
916
917    List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
918    for (ExtendedCell kv : kvList2) {
919      this.store.add(kv, null);
920    }
921
922    List<Cell> result;
923    Get get = new Get(Bytes.toBytes(1));
924    get.addColumn(family, qf1);
925
926    get.setTimeRange(0, 15);
927    result = HBaseTestingUtil.getFromStoreFile(store, get);
928    assertTrue(result.size() > 0);
929
930    get.setTimeRange(40, 90);
931    result = HBaseTestingUtil.getFromStoreFile(store, get);
932    assertTrue(result.size() > 0);
933
934    get.setTimeRange(10, 45);
935    result = HBaseTestingUtil.getFromStoreFile(store, get);
936    assertTrue(result.size() > 0);
937
938    get.setTimeRange(80, 145);
939    result = HBaseTestingUtil.getFromStoreFile(store, get);
940    assertTrue(result.size() > 0);
941
942    get.setTimeRange(1, 2);
943    result = HBaseTestingUtil.getFromStoreFile(store, get);
944    assertTrue(result.size() > 0);
945
946    get.setTimeRange(90, 200);
947    result = HBaseTestingUtil.getFromStoreFile(store, get);
948    assertTrue(result.size() == 0);
949  }
950
951  /**
952   * Test for HBASE-3492 - Test split on empty colfam (no store files).
953   * @throws IOException When the IO operations fail.
954   */
955  @Test
956  public void testSplitWithEmptyColFam() throws IOException {
957    init(this.name.getMethodName());
958    assertFalse(store.getSplitPoint().isPresent());
959  }
960
961  @Test
962  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
963    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
964    long anyValue = 10;
965
966    // We'll check that it uses correct config and propagates it appropriately by going thru
967    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
968    // a number we pass in is higher than some config value, inside compactionPolicy.
969    Configuration conf = HBaseConfiguration.create();
970    conf.setLong(CONFIG_KEY, anyValue);
971    init(name.getMethodName() + "-xml", conf);
972    assertTrue(store.throttleCompaction(anyValue + 1));
973    assertFalse(store.throttleCompaction(anyValue));
974
975    // HTD overrides XML.
976    --anyValue;
977    init(
978      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
979        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
980      ColumnFamilyDescriptorBuilder.of(family));
981    assertTrue(store.throttleCompaction(anyValue + 1));
982    assertFalse(store.throttleCompaction(anyValue));
983
984    // HCD overrides them both.
985    --anyValue;
986    init(name.getMethodName() + "-hcd", conf,
987      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
988        Long.toString(anyValue)),
989      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
990        .build());
991    assertTrue(store.throttleCompaction(anyValue + 1));
992    assertFalse(store.throttleCompaction(anyValue));
993  }
994
995  public static class DummyStoreEngine extends DefaultStoreEngine {
996    public static DefaultCompactor lastCreatedCompactor = null;
997
998    @Override
999    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
1000      throws IOException {
1001      super.createComponents(conf, store, comparator);
1002      lastCreatedCompactor = this.compactor;
1003    }
1004  }
1005
1006  @Test
1007  public void testStoreUsesSearchEngineOverride() throws Exception {
1008    Configuration conf = HBaseConfiguration.create();
1009    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1010    init(this.name.getMethodName(), conf);
1011    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1012  }
1013
1014  private void addStoreFile() throws IOException {
1015    HStoreFile f = this.store.getStorefiles().iterator().next();
1016    Path storedir = f.getPath().getParent();
1017    long seqid = this.store.getMaxSequenceId().orElse(0L);
1018    Configuration c = TEST_UTIL.getConfiguration();
1019    FileSystem fs = FileSystem.get(c);
1020    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1021    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1022      .withOutputDir(storedir).withFileContext(fileContext).build();
1023    w.appendMetadata(seqid + 1, false);
1024    w.close();
1025    LOG.info("Added store file:" + w.getPath());
1026  }
1027
1028  private void archiveStoreFile(int index) throws IOException {
1029    Collection<HStoreFile> files = this.store.getStorefiles();
1030    HStoreFile sf = null;
1031    Iterator<HStoreFile> it = files.iterator();
1032    for (int i = 0; i <= index; i++) {
1033      sf = it.next();
1034    }
1035    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1036      Lists.newArrayList(sf));
1037  }
1038
1039  private void closeCompactedFile(int index) throws IOException {
1040    Collection<HStoreFile> files =
1041      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1042    if (files.size() > 0) {
1043      HStoreFile sf = null;
1044      Iterator<HStoreFile> it = files.iterator();
1045      for (int i = 0; i <= index; i++) {
1046        sf = it.next();
1047      }
1048      sf.closeStoreFile(true);
1049      store.getStoreEngine().getStoreFileManager()
1050        .removeCompactedFiles(Collections.singletonList(sf));
1051    }
1052  }
1053
1054  @Test
1055  public void testRefreshStoreFiles() throws Exception {
1056    init(name.getMethodName());
1057
1058    assertEquals(0, this.store.getStorefilesCount());
1059
1060    // Test refreshing store files when no store files are there
1061    store.refreshStoreFiles();
1062    assertEquals(0, this.store.getStorefilesCount());
1063
1064    // add some data, flush
1065    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1066    flush(1);
1067    assertEquals(1, this.store.getStorefilesCount());
1068
1069    // add one more file
1070    addStoreFile();
1071
1072    assertEquals(1, this.store.getStorefilesCount());
1073    store.refreshStoreFiles();
1074    assertEquals(2, this.store.getStorefilesCount());
1075
1076    // add three more files
1077    addStoreFile();
1078    addStoreFile();
1079    addStoreFile();
1080
1081    assertEquals(2, this.store.getStorefilesCount());
1082    store.refreshStoreFiles();
1083    assertEquals(5, this.store.getStorefilesCount());
1084
1085    closeCompactedFile(0);
1086    archiveStoreFile(0);
1087
1088    assertEquals(5, this.store.getStorefilesCount());
1089    store.refreshStoreFiles();
1090    assertEquals(4, this.store.getStorefilesCount());
1091
1092    archiveStoreFile(0);
1093    archiveStoreFile(1);
1094    archiveStoreFile(2);
1095
1096    assertEquals(4, this.store.getStorefilesCount());
1097    store.refreshStoreFiles();
1098    assertEquals(1, this.store.getStorefilesCount());
1099
1100    archiveStoreFile(0);
1101    store.refreshStoreFiles();
1102    assertEquals(0, this.store.getStorefilesCount());
1103  }
1104
1105  @Test
1106  public void testRefreshStoreFilesNotChanged() throws IOException {
1107    init(name.getMethodName());
1108
1109    assertEquals(0, this.store.getStorefilesCount());
1110
1111    // add some data, flush
1112    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1113    flush(1);
1114    // add one more file
1115    addStoreFile();
1116
1117    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1118
1119    // call first time after files changed
1120    spiedStoreEngine.refreshStoreFiles();
1121    assertEquals(2, this.store.getStorefilesCount());
1122    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1123
1124    // call second time
1125    spiedStoreEngine.refreshStoreFiles();
1126
1127    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1128    // refreshed,
1129    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1130  }
1131
1132  @Test
1133  public void testScanWithCompactionAfterFlush() throws Exception {
1134    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1135      EverythingPolicy.class.getName());
1136    init(name.getMethodName());
1137
1138    assertEquals(0, this.store.getStorefilesCount());
1139
1140    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1141    // add some data, flush
1142    this.store.add(kv, null);
1143    flush(1);
1144    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1145    // add some data, flush
1146    this.store.add(kv, null);
1147    flush(2);
1148    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1149    // add some data, flush
1150    this.store.add(kv, null);
1151    flush(3);
1152
1153    ExecutorService service = Executors.newFixedThreadPool(2);
1154
1155    Scan scan = new Scan(new Get(row));
1156    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1157      try {
1158        LOG.info(">>>> creating scanner");
1159        return this.store.createScanner(scan,
1160          new ScanInfo(HBaseConfiguration.create(),
1161            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1162            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1163          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1164      } catch (IOException e) {
1165        e.printStackTrace();
1166        return null;
1167      }
1168    });
1169    Future compactFuture = service.submit(() -> {
1170      try {
1171        LOG.info(">>>>>> starting compaction");
1172        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1173        assertTrue(opCompaction.isPresent());
1174        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1175        LOG.info(">>>>>> Compaction is finished");
1176        this.store.closeAndArchiveCompactedFiles();
1177        LOG.info(">>>>>> Compacted files deleted");
1178      } catch (IOException e) {
1179        e.printStackTrace();
1180      }
1181    });
1182
1183    KeyValueScanner kvs = scanFuture.get();
1184    compactFuture.get();
1185    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1186      if (s instanceof StoreFileScanner) {
1187        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1188      }
1189    });
1190    kvs.seek(kv);
1191    service.shutdownNow();
1192  }
1193
1194  private long countMemStoreScanner(StoreScanner scanner) {
1195    if (scanner.currentScanners == null) {
1196      return 0;
1197    }
1198    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1199  }
1200
1201  @Test
1202  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1203    long seqId = 100;
1204    long timestamp = EnvironmentEdgeManager.currentTime();
1205    ExtendedCell cell0 =
1206      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1207        .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1208    PrivateCellUtil.setSequenceId(cell0, seqId);
1209    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1210
1211    ExtendedCell cell1 =
1212      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1213        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1214    PrivateCellUtil.setSequenceId(cell1, seqId);
1215    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1216
1217    seqId = 101;
1218    timestamp = EnvironmentEdgeManager.currentTime();
1219    ExtendedCell cell2 =
1220      ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1221        .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1222    PrivateCellUtil.setSequenceId(cell2, seqId);
1223    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1224  }
1225
1226  private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot,
1227    List<ExtendedCell> inputCellsAfterSnapshot) throws IOException {
1228    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1229    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1230    long seqId = Long.MIN_VALUE;
1231    for (ExtendedCell c : inputCellsBeforeSnapshot) {
1232      quals.add(CellUtil.cloneQualifier(c));
1233      seqId = Math.max(seqId, c.getSequenceId());
1234    }
1235    for (ExtendedCell c : inputCellsAfterSnapshot) {
1236      quals.add(CellUtil.cloneQualifier(c));
1237      seqId = Math.max(seqId, c.getSequenceId());
1238    }
1239    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1240    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1241    storeFlushCtx.prepare();
1242    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1243    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1244    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1245      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1246      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1247      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1248      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1249      // snapshot has no data after flush
1250      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1251      boolean more;
1252      int cellCount = 0;
1253      do {
1254        List<Cell> cells = new ArrayList<>();
1255        more = s.next(cells);
1256        cellCount += cells.size();
1257        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1258      } while (more);
1259      assertEquals(
1260        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1261          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1262        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1263      // the current scanners is cleared
1264      assertEquals(0, countMemStoreScanner(s));
1265    }
1266  }
1267
1268  private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1269    throws IOException {
1270    return createCell(row, qualifier, ts, sequenceId, value);
1271  }
1272
1273  private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId,
1274    byte[] value) throws IOException {
1275    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
1276      .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
1277      .setValue(value).setSequenceId(sequenceId).build();
1278  }
1279
1280  @Test
1281  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1282    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1283    final int expectedSize = 3;
1284    testFlushBeforeCompletingScan(new MyListHook() {
1285      @Override
1286      public void hook(int currentSize) {
1287        if (currentSize == expectedSize - 1) {
1288          try {
1289            flushStore(store, id++);
1290            timeToGoNextRow.set(true);
1291          } catch (IOException e) {
1292            throw new RuntimeException(e);
1293          }
1294        }
1295      }
1296    }, new FilterBase() {
1297      @Override
1298      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1299        return ReturnCode.INCLUDE;
1300      }
1301    }, expectedSize);
1302  }
1303
1304  @Test
1305  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1306    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1307    final int expectedSize = 2;
1308    testFlushBeforeCompletingScan(new MyListHook() {
1309      @Override
1310      public void hook(int currentSize) {
1311        if (currentSize == expectedSize - 1) {
1312          try {
1313            flushStore(store, id++);
1314            timeToGoNextRow.set(true);
1315          } catch (IOException e) {
1316            throw new RuntimeException(e);
1317          }
1318        }
1319      }
1320    }, new FilterBase() {
1321      @Override
1322      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1323        if (timeToGoNextRow.get()) {
1324          timeToGoNextRow.set(false);
1325          return ReturnCode.NEXT_ROW;
1326        } else {
1327          return ReturnCode.INCLUDE;
1328        }
1329      }
1330    }, expectedSize);
1331  }
1332
1333  @Test
1334  public void testFlushBeforeCompletingScanWithFilterHint()
1335    throws IOException, InterruptedException {
1336    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1337    final int expectedSize = 2;
1338    testFlushBeforeCompletingScan(new MyListHook() {
1339      @Override
1340      public void hook(int currentSize) {
1341        if (currentSize == expectedSize - 1) {
1342          try {
1343            flushStore(store, id++);
1344            timeToGetHint.set(true);
1345          } catch (IOException e) {
1346            throw new RuntimeException(e);
1347          }
1348        }
1349      }
1350    }, new FilterBase() {
1351      @Override
1352      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1353        if (timeToGetHint.get()) {
1354          timeToGetHint.set(false);
1355          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1356        } else {
1357          return Filter.ReturnCode.INCLUDE;
1358        }
1359      }
1360
1361      @Override
1362      public Cell getNextCellHint(Cell currentCell) throws IOException {
1363        return currentCell;
1364      }
1365    }, expectedSize);
1366  }
1367
1368  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1369    throws IOException, InterruptedException {
1370    Configuration conf = HBaseConfiguration.create();
1371    byte[] r0 = Bytes.toBytes("row0");
1372    byte[] r1 = Bytes.toBytes("row1");
1373    byte[] r2 = Bytes.toBytes("row2");
1374    byte[] value0 = Bytes.toBytes("value0");
1375    byte[] value1 = Bytes.toBytes("value1");
1376    byte[] value2 = Bytes.toBytes("value2");
1377    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1378    long ts = EnvironmentEdgeManager.currentTime();
1379    long seqId = 100;
1380    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1381      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1382      new MyStoreHook() {
1383        @Override
1384        public long getSmallestReadPoint(HStore store) {
1385          return seqId + 3;
1386        }
1387      });
1388    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1389    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1390    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1391    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1392    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1393    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1394    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1395    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1396    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1397    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1398    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1399    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1400    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1401    List<Cell> myList = new MyList<>(hook);
1402    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1403    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1404      // r1
1405      scanner.next(myList);
1406      assertEquals(expectedSize, myList.size());
1407      for (Cell c : myList) {
1408        byte[] actualValue = CellUtil.cloneValue(c);
1409        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1410          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1411      }
1412      List<Cell> normalList = new ArrayList<>(3);
1413      // r2
1414      scanner.next(normalList);
1415      assertEquals(3, normalList.size());
1416      for (Cell c : normalList) {
1417        byte[] actualValue = CellUtil.cloneValue(c);
1418        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1419          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1420      }
1421    }
1422  }
1423
1424  @Test
1425  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1426    Configuration conf = HBaseConfiguration.create();
1427    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1428    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1429      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1430    byte[] value = Bytes.toBytes("value");
1431    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1432    long ts = EnvironmentEdgeManager.currentTime();
1433    long seqId = 100;
1434    // older data whihc shouldn't be "seen" by client
1435    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1436    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1437    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1438    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1439    quals.add(qf1);
1440    quals.add(qf2);
1441    quals.add(qf3);
1442    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1443    MyCompactingMemStore.START_TEST.set(true);
1444    Runnable flush = () -> {
1445      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1446      // recreate the active memstore -- phase (4/5)
1447      storeFlushCtx.prepare();
1448    };
1449    ExecutorService service = Executors.newSingleThreadExecutor();
1450    service.execute(flush);
1451    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1452    // this is blocked until we recreate the active memstore -- phase (3/5)
1453    // we get scanner from active memstore but it is empty -- phase (5/5)
1454    InternalScanner scanner =
1455      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1456    service.shutdown();
1457    service.awaitTermination(20, TimeUnit.SECONDS);
1458    try {
1459      try {
1460        List<Cell> results = new ArrayList<>();
1461        scanner.next(results);
1462        assertEquals(3, results.size());
1463        for (Cell c : results) {
1464          byte[] actualValue = CellUtil.cloneValue(c);
1465          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1466            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1467        }
1468      } finally {
1469        scanner.close();
1470      }
1471    } finally {
1472      MyCompactingMemStore.START_TEST.set(false);
1473      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1474      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1475    }
1476  }
1477
1478  @Test
1479  public void testScanWithDoubleFlush() throws IOException {
1480    Configuration conf = HBaseConfiguration.create();
1481    // Initialize region
1482    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1483      @Override
1484      public void getScanners(MyStore store) throws IOException {
1485        final long tmpId = id++;
1486        ExecutorService s = Executors.newSingleThreadExecutor();
1487        s.execute(() -> {
1488          try {
1489            // flush the store before storescanner updates the scanners from store.
1490            // The current data will be flushed into files, and the memstore will
1491            // be clear.
1492            // -- phase (4/4)
1493            flushStore(store, tmpId);
1494          } catch (IOException ex) {
1495            throw new RuntimeException(ex);
1496          }
1497        });
1498        s.shutdown();
1499        try {
1500          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1501          s.awaitTermination(3, TimeUnit.SECONDS);
1502        } catch (InterruptedException ex) {
1503        }
1504      }
1505    });
1506    byte[] oldValue = Bytes.toBytes("oldValue");
1507    byte[] currentValue = Bytes.toBytes("currentValue");
1508    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1509    long ts = EnvironmentEdgeManager.currentTime();
1510    long seqId = 100;
1511    // older data whihc shouldn't be "seen" by client
1512    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1513    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1514    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1515    long snapshotId = id++;
1516    // push older data into snapshot -- phase (1/4)
1517    StoreFlushContext storeFlushCtx =
1518      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1519    storeFlushCtx.prepare();
1520
1521    // insert current data into active -- phase (2/4)
1522    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1523    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1524    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1525    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1526    quals.add(qf1);
1527    quals.add(qf2);
1528    quals.add(qf3);
1529    try (InternalScanner scanner =
1530      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1531      // complete the flush -- phase (3/4)
1532      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1533      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1534
1535      List<Cell> results = new ArrayList<>();
1536      scanner.next(results);
1537      assertEquals(3, results.size());
1538      for (Cell c : results) {
1539        byte[] actualValue = CellUtil.cloneValue(c);
1540        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1541          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1542      }
1543    }
1544  }
1545
1546  /**
1547   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1548   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1549   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1550   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1551   */
1552  @Test
1553  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1554    Configuration conf = HBaseConfiguration.create();
1555    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1556    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1557    byte[] r0 = Bytes.toBytes("row0");
1558    byte[] r1 = Bytes.toBytes("row1");
1559    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1560    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1561    // Initialize region
1562    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1563      @Override
1564      public void getScanners(MyStore store) throws IOException {
1565        try {
1566          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1567          // following TestHStore.flushStore
1568          if (shouldWaitRef.get()) {
1569            // wait the following compaction Task start
1570            cyclicBarrier.await();
1571            // wait the following HStore.closeAndArchiveCompactedFiles end.
1572            cyclicBarrier.await();
1573          }
1574        } catch (BrokenBarrierException | InterruptedException e) {
1575          throw new RuntimeException(e);
1576        }
1577      }
1578    });
1579
1580    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1581    Runnable compactionTask = () -> {
1582      try {
1583        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1584        // entering the MyStore.getScanners, compactionTask could start.
1585        cyclicBarrier.await();
1586        region.compactStore(family, new NoLimitThroughputController());
1587        myStore.closeAndArchiveCompactedFiles();
1588        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1589        cyclicBarrier.await();
1590      } catch (Throwable e) {
1591        compactionExceptionRef.set(e);
1592      }
1593    };
1594
1595    long ts = EnvironmentEdgeManager.currentTime();
1596    long seqId = 100;
1597    byte[] value = Bytes.toBytes("value");
1598    // older data whihc shouldn't be "seen" by client
1599    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1600    flushStore(myStore, id++);
1601    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1602    flushStore(myStore, id++);
1603    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1604    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1605    quals.add(qf1);
1606    quals.add(qf2);
1607    quals.add(qf3);
1608
1609    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1610    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1611    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1612
1613    Thread.currentThread()
1614      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1615    Scan scan = new Scan();
1616    scan.withStartRow(r0, true);
1617    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1618      List<Cell> results = new MyList<>(size -> {
1619        switch (size) {
1620          case 1:
1621            shouldWaitRef.set(true);
1622            Thread thread = new Thread(compactionTask);
1623            thread.setName("MyCompacting Thread.");
1624            thread.start();
1625            try {
1626              flushStore(myStore, id++);
1627              thread.join();
1628            } catch (IOException | InterruptedException e) {
1629              throw new RuntimeException(e);
1630            }
1631            shouldWaitRef.set(false);
1632            break;
1633          default:
1634            break;
1635        }
1636      });
1637      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1638      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1639      scanner.next(results);
1640      // The results is r0 row cells.
1641      assertEquals(3, results.size());
1642      assertTrue(compactionExceptionRef.get() == null);
1643    }
1644  }
1645
1646  @Test
1647  public void testReclaimChunkWhenScaning() throws IOException {
1648    init("testReclaimChunkWhenScaning");
1649    long ts = EnvironmentEdgeManager.currentTime();
1650    long seqId = 100;
1651    byte[] value = Bytes.toBytes("value");
1652    // older data whihc shouldn't be "seen" by client
1653    store.add(createCell(qf1, ts, seqId, value), null);
1654    store.add(createCell(qf2, ts, seqId, value), null);
1655    store.add(createCell(qf3, ts, seqId, value), null);
1656    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1657    quals.add(qf1);
1658    quals.add(qf2);
1659    quals.add(qf3);
1660    try (InternalScanner scanner =
1661      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1662      List<Cell> results = new MyList<>(size -> {
1663        switch (size) {
1664          // 1) we get the first cell (qf1)
1665          // 2) flush the data to have StoreScanner update inner scanners
1666          // 3) the chunk will be reclaimed after updaing
1667          case 1:
1668            try {
1669              flushStore(store, id++);
1670            } catch (IOException e) {
1671              throw new RuntimeException(e);
1672            }
1673            break;
1674          // 1) we get the second cell (qf2)
1675          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1676          case 2:
1677            try {
1678              byte[] newValue = Bytes.toBytes("newValue");
1679              // older data whihc shouldn't be "seen" by client
1680              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1681              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1682              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1683            } catch (IOException e) {
1684              throw new RuntimeException(e);
1685            }
1686            break;
1687          default:
1688            break;
1689        }
1690      });
1691      scanner.next(results);
1692      assertEquals(3, results.size());
1693      for (Cell c : results) {
1694        byte[] actualValue = CellUtil.cloneValue(c);
1695        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1696          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1697      }
1698    }
1699  }
1700
1701  /**
1702   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1703   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1704   * the corresponding segments. In short, there will be some segements which isn't in merge are
1705   * removed.
1706   */
1707  @Test
1708  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1709    int flushSize = 500;
1710    Configuration conf = HBaseConfiguration.create();
1711    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1712    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1713    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1714    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1715    // Set the lower threshold to invoke the "MERGE" policy
1716    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1717    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1718      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1719    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1720    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1721    long ts = EnvironmentEdgeManager.currentTime();
1722    long seqId = 100;
1723    // older data whihc shouldn't be "seen" by client
1724    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1725    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1726    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1727    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1728    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1729    storeFlushCtx.prepare();
1730    // This shouldn't invoke another in-memory flush because the first compactor thread
1731    // hasn't accomplished the in-memory compaction.
1732    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1733    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1734    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1735    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1736    // okay. Let the compaction be completed
1737    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1738    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1739    while (mem.isMemStoreFlushingInMemory()) {
1740      TimeUnit.SECONDS.sleep(1);
1741    }
1742    // This should invoke another in-memory flush.
1743    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1744    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1745    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1746    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1747    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1748      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1749    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1750    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1751  }
1752
1753  @Test
1754  public void testAge() throws IOException {
1755    long currentTime = EnvironmentEdgeManager.currentTime();
1756    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1757    edge.setValue(currentTime);
1758    EnvironmentEdgeManager.injectEdge(edge);
1759    Configuration conf = TEST_UTIL.getConfiguration();
1760    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1761    initHRegion(name.getMethodName(), conf,
1762      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1763    HStore store = new HStore(region, hcd, conf, false) {
1764
1765      @Override
1766      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1767        CellComparator kvComparator) throws IOException {
1768        List<HStoreFile> storefiles =
1769          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1770            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1771        StoreFileManager sfm = mock(StoreFileManager.class);
1772        when(sfm.getStoreFiles()).thenReturn(storefiles);
1773        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1774        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1775        return storeEngine;
1776      }
1777    };
1778    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1779    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1780    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1781  }
1782
1783  private HStoreFile mockStoreFile(long createdTime) {
1784    StoreFileInfo info = mock(StoreFileInfo.class);
1785    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1786    HStoreFile sf = mock(HStoreFile.class);
1787    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1788    when(sf.isHFile()).thenReturn(true);
1789    when(sf.getFileInfo()).thenReturn(info);
1790    return sf;
1791  }
1792
1793  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1794    throws IOException {
1795    return (MyStore) init(methodName, conf,
1796      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1797      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1798  }
1799
1800  private static class MyStore extends HStore {
1801    private final MyStoreHook hook;
1802
1803    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1804      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1805      super(region, family, confParam, false);
1806      this.hook = hook;
1807    }
1808
1809    @Override
1810    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1811      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1812      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1813      boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {
1814      hook.getScanners(this);
1815      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1816        stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion);
1817    }
1818
1819    @Override
1820    public long getSmallestReadPoint() {
1821      return hook.getSmallestReadPoint(this);
1822    }
1823  }
1824
1825  private abstract static class MyStoreHook {
1826
1827    void getScanners(MyStore store) throws IOException {
1828    }
1829
1830    long getSmallestReadPoint(HStore store) {
1831      return store.getHRegion().getSmallestReadPoint();
1832    }
1833  }
1834
1835  @Test
1836  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1837    Configuration conf = HBaseConfiguration.create();
1838    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1839    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1840    // Set the lower threshold to invoke the "MERGE" policy
1841    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1842    });
1843    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1844    long ts = EnvironmentEdgeManager.currentTime();
1845    long seqID = 1L;
1846    // Add some data to the region and do some flushes
1847    for (int i = 1; i < 10; i++) {
1848      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1849        memStoreSizing);
1850    }
1851    // flush them
1852    flushStore(store, seqID);
1853    for (int i = 11; i < 20; i++) {
1854      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1855        memStoreSizing);
1856    }
1857    // flush them
1858    flushStore(store, seqID);
1859    for (int i = 21; i < 30; i++) {
1860      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1861        memStoreSizing);
1862    }
1863    // flush them
1864    flushStore(store, seqID);
1865
1866    assertEquals(3, store.getStorefilesCount());
1867    Scan scan = new Scan();
1868    scan.addFamily(family);
1869    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1870    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1871    StoreScanner storeScanner =
1872      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1873    // get the current heap
1874    KeyValueHeap heap = storeScanner.heap;
1875    // create more store files
1876    for (int i = 31; i < 40; i++) {
1877      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1878        memStoreSizing);
1879    }
1880    // flush them
1881    flushStore(store, seqID);
1882
1883    for (int i = 41; i < 50; i++) {
1884      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1885        memStoreSizing);
1886    }
1887    // flush them
1888    flushStore(store, seqID);
1889    storefiles2 = store.getStorefiles();
1890    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1891    actualStorefiles1.removeAll(actualStorefiles);
1892    // Do compaction
1893    MyThread thread = new MyThread(storeScanner);
1894    thread.start();
1895    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1896    thread.join();
1897    KeyValueHeap heap2 = thread.getHeap();
1898    assertFalse(heap.equals(heap2));
1899  }
1900
1901  @Test
1902  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1903    Configuration conf = HBaseConfiguration.create();
1904    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1905    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1906    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1907    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1908    });
1909    Scan scan = new Scan();
1910    scan.addFamily(family);
1911    // ReadType on Scan is still DEFAULT only.
1912    assertEquals(ReadType.DEFAULT, scan.getReadType());
1913    StoreScanner storeScanner =
1914      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1915    assertFalse(storeScanner.isScanUsePread());
1916  }
1917
1918  @Test
1919  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1920    Configuration conf = HBaseConfiguration.create();
1921    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1922    init(name.getMethodName(), conf,
1923      TableDescriptorBuilder.newBuilder(
1924        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1925      ColumnFamilyDescriptorBuilder.newBuilder(family)
1926        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1927    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1928      .startsWith("eager".toUpperCase()));
1929  }
1930
1931  @Test
1932  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1933    final TableName tn = TableName.valueOf(name.getMethodName());
1934    init(name.getMethodName());
1935
1936    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1937
1938    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1939    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1940    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1941    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1942
1943    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1944      .setEndKey(Bytes.toBytes("b")).build();
1945
1946    // Compacting two files down to one, reducing size
1947    sizeStore.put(regionInfo, 1024L + 4096L);
1948    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1949      Arrays.asList(sf2));
1950
1951    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1952
1953    // The same file length in and out should have no change
1954    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1955      Arrays.asList(sf2));
1956
1957    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1958
1959    // Increase the total size used
1960    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1961      Arrays.asList(sf3));
1962
1963    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1964
1965    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1966      .setEndKey(Bytes.toBytes("c")).build();
1967    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1968
1969    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1970  }
1971
1972  @Test
1973  public void testHFileContextSetWithCFAndTable() throws Exception {
1974    init(this.name.getMethodName());
1975    StoreFileWriter writer = store.getStoreEngine()
1976      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1977        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1978        .includesTag(false).shouldDropBehind(true));
1979    HFileContext hFileContext = writer.getLiveFileWriter().getFileContext();
1980    assertArrayEquals(family, hFileContext.getColumnFamily());
1981    assertArrayEquals(table, hFileContext.getTableName());
1982  }
1983
1984  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1985  // but its dataSize exceeds inmemoryFlushSize
1986  @Test
1987  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1988    throws IOException, InterruptedException {
1989    Configuration conf = HBaseConfiguration.create();
1990
1991    byte[] smallValue = new byte[3];
1992    byte[] largeValue = new byte[9];
1993    final long timestamp = EnvironmentEdgeManager.currentTime();
1994    final long seqId = 100;
1995    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1996    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1997    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1998    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1999    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
2000
2001    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2002    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
2003    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2004    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2005
2006    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2007      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2008
2009    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
2010    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2011    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
2012    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
2013
2014    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2015    Thread smallCellThread = new Thread(() -> {
2016      try {
2017        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2018      } catch (Throwable exception) {
2019        exceptionRef.set(exception);
2020      }
2021    });
2022    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2023    smallCellThread.start();
2024
2025    String oldThreadName = Thread.currentThread().getName();
2026    try {
2027      /**
2028       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2029       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2030       * invokes flushInMemory.
2031       * <p/>
2032       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2033       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2034       * method, CompactingMemStore.active has no cell.
2035       */
2036      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2037      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2038      smallCellThread.join();
2039
2040      for (int i = 0; i < 100; i++) {
2041        long currentTimestamp = timestamp + 100 + i;
2042        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2043        store.add(cell, new NonThreadSafeMemStoreSizing());
2044      }
2045    } finally {
2046      Thread.currentThread().setName(oldThreadName);
2047    }
2048
2049    assertTrue(exceptionRef.get() == null);
2050
2051  }
2052
2053  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2054  // InmemoryFlushSize
2055  @Test(timeout = 60000)
2056  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2057    Configuration conf = HBaseConfiguration.create();
2058    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2059
2060    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2061      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2062
2063    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2064
2065    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2066    byte[] value = new byte[size + 1];
2067
2068    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2069    long timestamp = EnvironmentEdgeManager.currentTime();
2070    long seqId = 100;
2071    ExtendedCell cell = createCell(qf1, timestamp, seqId, value);
2072    int cellByteSize = MutableSegment.getCellLength(cell);
2073    store.add(cell, memStoreSizing);
2074    assertTrue(memStoreSizing.getCellsCount() == 1);
2075    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2076    // Waiting the in memory compaction completed, see HBASE-26438
2077    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2078  }
2079
2080  /**
2081   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2082   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2083   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2084   * segment replace, and we may read wrong data when these chunk reused by others.
2085   */
2086  @Test
2087  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2088    Configuration conf = HBaseConfiguration.create();
2089    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2090
2091    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2092    byte[] cellValue = new byte[maxAllocByteSize + 1];
2093    final long timestamp = EnvironmentEdgeManager.currentTime();
2094    final long seqId = 100;
2095    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2096    final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2097    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2098    final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2099    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2100    quals.add(qf1);
2101
2102    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2103    int inMemoryFlushByteSize = cellByteSize - 1;
2104
2105    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2106    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2107    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2108    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2109    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2110
2111    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2112    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2113      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2114
2115    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2116    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2117
2118    // Data chunk Pool is disabled.
2119    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2120
2121    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2122
2123    // First compact
2124    store.add(originalCell1, memStoreSizing);
2125    // Waiting for the first in-memory compaction finished
2126    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2127
2128    StoreScanner storeScanner =
2129      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2130    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2131    ExtendedCell resultCell1 = segmentScanner.next();
2132    assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1));
2133    int cell1ChunkId = resultCell1.getChunkId();
2134    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2135    assertNull(segmentScanner.next());
2136    segmentScanner.close();
2137    storeScanner.close();
2138    Segment segment = segmentScanner.segment;
2139    assertTrue(segment instanceof CellChunkImmutableSegment);
2140    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2141    assertTrue(!memStoreLAB1.isClosed());
2142    assertTrue(!memStoreLAB1.chunks.isEmpty());
2143    assertTrue(!memStoreLAB1.isReclaimed());
2144
2145    // Second compact
2146    store.add(originalCell2, memStoreSizing);
2147    // Waiting for the second in-memory compaction finished
2148    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2149
2150    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2151    // must be associated with chunk.. We were looking for a cell at index 0.
2152    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2153    // are recycled after the second in-memory compaction finished,the
2154    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2155    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2156    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2157    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2158    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2159    ExtendedCell newResultCell1 = segmentScanner.next();
2160    assertTrue(newResultCell1 != resultCell1);
2161    assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1));
2162
2163    ExtendedCell resultCell2 = segmentScanner.next();
2164    assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2));
2165    assertNull(segmentScanner.next());
2166    segmentScanner.close();
2167    storeScanner.close();
2168
2169    segment = segmentScanner.segment;
2170    assertTrue(segment instanceof CellChunkImmutableSegment);
2171    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2172    assertTrue(!memStoreLAB2.isClosed());
2173    assertTrue(!memStoreLAB2.chunks.isEmpty());
2174    assertTrue(!memStoreLAB2.isReclaimed());
2175    assertTrue(memStoreLAB1.isClosed());
2176    assertTrue(memStoreLAB1.chunks.isEmpty());
2177    assertTrue(memStoreLAB1.isReclaimed());
2178  }
2179
2180  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2181  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2182  @Test
2183  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2184    throws IOException, InterruptedException {
2185    doWriteTestLargeCellAndSmallCellConcurrently(
2186      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2187    doWriteTestLargeCellAndSmallCellConcurrently(
2188      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2189    doWriteTestLargeCellAndSmallCellConcurrently(
2190      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2191    doWriteTestLargeCellAndSmallCellConcurrently(
2192      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2193    doWriteTestLargeCellAndSmallCellConcurrently(
2194      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2195  }
2196
2197  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2198    throws IOException, InterruptedException {
2199
2200    Configuration conf = HBaseConfiguration.create();
2201
2202    byte[] smallValue = new byte[3];
2203    byte[] largeValue = new byte[100];
2204    final long timestamp = EnvironmentEdgeManager.currentTime();
2205    final long seqId = 100;
2206    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2207    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2208    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2209    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2210    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2211    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2212      flushByteSize < (smallCellByteSize + largeCellByteSize);
2213
2214    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2215    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2216    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2217
2218    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2219      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2220
2221    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2222    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2223    myCompactingMemStore.disableCompaction();
2224    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2225      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2226    } else {
2227      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2228    }
2229
2230    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2231    final AtomicLong totalCellByteSize = new AtomicLong(0);
2232    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2233    Thread smallCellThread = new Thread(() -> {
2234      try {
2235        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2236          long currentTimestamp = timestamp + i;
2237          ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2238          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2239          store.add(cell, memStoreSizing);
2240        }
2241      } catch (Throwable exception) {
2242        exceptionRef.set(exception);
2243
2244      }
2245    });
2246    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2247    smallCellThread.start();
2248
2249    String oldThreadName = Thread.currentThread().getName();
2250    try {
2251      /**
2252       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2253       * </p>
2254       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2255       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2256       * largeCellThread invokes flushInMemory.
2257       * <p/>
2258       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2259       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2260       * <p/>
2261       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2262       * largeCellThread concurrently write one cell and wait each other, and then write another
2263       * cell etc.
2264       */
2265      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2266      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2267        long currentTimestamp = timestamp + i;
2268        ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2269        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2270        store.add(cell, memStoreSizing);
2271      }
2272      smallCellThread.join();
2273
2274      assertTrue(exceptionRef.get() == null);
2275      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2276      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2277      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2278        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2279      } else {
2280        assertTrue(
2281          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2282      }
2283    } finally {
2284      Thread.currentThread().setName(oldThreadName);
2285    }
2286  }
2287
2288  /**
2289   * <pre>
2290   * This test is for HBASE-26384,
2291   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2292   * execute concurrently.
2293   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2294   * for both branch-2 and master):
2295   * 1. The {@link CompactingMemStore} size exceeds
2296   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2297   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2298   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2299   * 2. The in memory compact thread starts and then stopping before
2300   *    {@link CompactingMemStore#flattenOneSegment}.
2301   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2302   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2303   *    compact thread continues.
2304   *    Assuming {@link VersionedSegmentsList#version} returned from
2305   *    {@link CompactingMemStore#getImmutableSegments} is v.
2306   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2307   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2308   *    {@link CompactionPipeline#version} is still v.
2309   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2310   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2311   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2312   *    {@link CompactionPipeline} has changed because
2313   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2314   *    removed in fact and still remaining in {@link CompactionPipeline}.
2315   *
2316   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2317   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2318   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2319   *    v+1.
2320   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2321   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2322   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2323   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2324   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2325   * </pre>
2326   */
2327  @Test
2328  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2329    Configuration conf = HBaseConfiguration.create();
2330
2331    byte[] smallValue = new byte[3];
2332    byte[] largeValue = new byte[9];
2333    final long timestamp = EnvironmentEdgeManager.currentTime();
2334    final long seqId = 100;
2335    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2336    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2337    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2338    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2339    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2340    int flushByteSize = totalCellByteSize - 2;
2341
2342    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2343    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2344    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2345    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2346
2347    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2348      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2349
2350    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2351    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2352
2353    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2354    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2355
2356    String oldThreadName = Thread.currentThread().getName();
2357    try {
2358      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2359      /**
2360       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2361       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2362       * would invoke {@link CompactingMemStore#stopCompaction}.
2363       */
2364      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2365
2366      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2367      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2368
2369      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2370      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2371      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2372      assertTrue(segments.getNumOfSegments() == 0);
2373      assertTrue(segments.getNumOfCells() == 0);
2374      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2375      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2376    } finally {
2377      Thread.currentThread().setName(oldThreadName);
2378    }
2379  }
2380
2381  /**
2382   * <pre>
2383   * This test is for HBASE-26384,
2384   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2385   * and writeMemStore execute concurrently.
2386   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2387   * for both branch-2 and master):
2388   * 1. The {@link CompactingMemStore} size exceeds
2389   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2390   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2391   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2392   * 2. The in memory compact thread starts and then stopping before
2393   *    {@link CompactingMemStore#flattenOneSegment}.
2394   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2395   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2396   *    compact thread continues.
2397   *    Assuming {@link VersionedSegmentsList#version} returned from
2398   *    {@link CompactingMemStore#getImmutableSegments} is v.
2399   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2400   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2401   *    {@link CompactionPipeline#version} is still v.
2402   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2403   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2404   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2405   *    {@link CompactionPipeline} has changed because
2406   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2407   *    removed in fact and still remaining in {@link CompactionPipeline}.
2408   *
2409   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2410   * and I add step 7-8 to test there is new segment added before retry.
2411   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2412   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2413   *     v+1.
2414   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2415   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2416   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2417   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2418   * 7. The write thread continues writing to {@link CompactingMemStore} and
2419   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2420   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2421   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2422   *    {@link CompactionPipeline#version} is still v+1.
2423   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2424   *    {@link CompactionPipeline#version} is still v+1,
2425   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2426   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2427   *    {@link CompactingMemStore#swapPipelineWithNull}.
2428   * </pre>
2429   */
2430  @Test
2431  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2432    Configuration conf = HBaseConfiguration.create();
2433
2434    byte[] smallValue = new byte[3];
2435    byte[] largeValue = new byte[9];
2436    final long timestamp = EnvironmentEdgeManager.currentTime();
2437    final long seqId = 100;
2438    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2439    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2440    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2441    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2442    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2443    int flushByteSize = firstWriteCellByteSize - 2;
2444
2445    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2446    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2447    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2448    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2449
2450    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2451      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2452
2453    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2454    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2455
2456    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2457    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2458
2459    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2460    final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2461    final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2462    final int writeAgainCellByteSize =
2463      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2464    final Thread writeAgainThread = new Thread(() -> {
2465      try {
2466        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2467
2468        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2469        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2470
2471        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2472      } catch (Throwable exception) {
2473        exceptionRef.set(exception);
2474      }
2475    });
2476    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2477    writeAgainThread.start();
2478
2479    String oldThreadName = Thread.currentThread().getName();
2480    try {
2481      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2482      /**
2483       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2484       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2485       * would invoke {@link CompactingMemStore#stopCompaction}.
2486       */
2487      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2488      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2489      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2490      writeAgainThread.join();
2491
2492      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2493      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2494      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2495      assertTrue(segments.getNumOfSegments() == 1);
2496      assertTrue(
2497        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2498      assertTrue(segments.getNumOfCells() == 2);
2499      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2500      assertTrue(exceptionRef.get() == null);
2501      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2502    } finally {
2503      Thread.currentThread().setName(oldThreadName);
2504    }
2505  }
2506
2507  /**
2508   * <pre>
2509   * This test is for HBASE-26465,
2510   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2511   * concurrently. The threads sequence before HBASE-26465 is:
2512   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2513   *  {@link DefaultMemStore}.
2514   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2515   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2516   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2517   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2518   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2519   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2520   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2521   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2522   *   are recycled.
2523   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2524   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2525   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2526   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2527   *   be overwritten by other write threads,which may cause serious problem.
2528   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2529   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2530   * </pre>
2531   */
2532  @Test
2533  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2534    Configuration conf = HBaseConfiguration.create();
2535
2536    byte[] smallValue = new byte[3];
2537    byte[] largeValue = new byte[9];
2538    final long timestamp = EnvironmentEdgeManager.currentTime();
2539    final long seqId = 100;
2540    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2541    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2542    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2543    quals.add(qf1);
2544    quals.add(qf2);
2545
2546    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2547    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2548
2549    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2550    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2551    myDefaultMemStore.store = store;
2552
2553    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2554    store.add(smallCell, memStoreSizing);
2555    store.add(largeCell, memStoreSizing);
2556
2557    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2558    final Thread flushThread = new Thread(() -> {
2559      try {
2560        flushStore(store, id++);
2561      } catch (Throwable exception) {
2562        exceptionRef.set(exception);
2563      }
2564    });
2565    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2566    flushThread.start();
2567
2568    String oldThreadName = Thread.currentThread().getName();
2569    StoreScanner storeScanner = null;
2570    try {
2571      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2572
2573      /**
2574       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2575       */
2576      myDefaultMemStore.getScannerCyclicBarrier.await();
2577
2578      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2579      flushThread.join();
2580
2581      if (myDefaultMemStore.shouldWait) {
2582        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2583        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2584        assertTrue(memStoreLAB.isClosed());
2585        assertTrue(!memStoreLAB.chunks.isEmpty());
2586        assertTrue(!memStoreLAB.isReclaimed());
2587
2588        ExtendedCell cell1 = segmentScanner.next();
2589        PrivateCellUtil.equals(smallCell, cell1);
2590        ExtendedCell cell2 = segmentScanner.next();
2591        PrivateCellUtil.equals(largeCell, cell2);
2592        assertNull(segmentScanner.next());
2593      } else {
2594        List<ExtendedCell> results = new ArrayList<>();
2595        storeScanner.next(results);
2596        assertEquals(2, results.size());
2597        PrivateCellUtil.equals(smallCell, results.get(0));
2598        PrivateCellUtil.equals(largeCell, results.get(1));
2599      }
2600      assertTrue(exceptionRef.get() == null);
2601    } finally {
2602      if (storeScanner != null) {
2603        storeScanner.close();
2604      }
2605      Thread.currentThread().setName(oldThreadName);
2606    }
2607  }
2608
2609  @SuppressWarnings("unchecked")
2610  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2611    List<T> resultScanners = new ArrayList<T>();
2612    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2613      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2614        resultScanners.add((T) keyValueScanner);
2615      }
2616    }
2617    assertTrue(resultScanners.size() == 1);
2618    return resultScanners.get(0);
2619  }
2620
2621  @Test
2622  public void testOnConfigurationChange() throws IOException {
2623    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2624    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2625    final int STORE_MAX_FILES_TO_COMPACT = 6;
2626
2627    // Build a table that its maxFileToCompact different from common configuration.
2628    Configuration conf = HBaseConfiguration.create();
2629    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2630      COMMON_MAX_FILES_TO_COMPACT);
2631    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2632    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2633    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2634    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2635      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2636        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2637      .build();
2638    init(this.name.getMethodName(), conf, hcd);
2639
2640    // After updating common configuration, the conf in HStore itself must not be changed.
2641    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2642      NEW_COMMON_MAX_FILES_TO_COMPACT);
2643    this.store.onConfigurationChange(conf);
2644
2645    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2646      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2647
2648    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2649    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2650    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2651
2652    // reset to default values
2653    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2654    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2655    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2656    this.store.onConfigurationChange(conf);
2657  }
2658
2659  /**
2660   * This test is for HBASE-26476
2661   */
2662  @Test
2663  public void testExtendsDefaultMemStore() throws Exception {
2664    Configuration conf = HBaseConfiguration.create();
2665    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2666
2667    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2668    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2669    tearDown();
2670
2671    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2672    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2673    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2674  }
2675
2676  static class CustomDefaultMemStore extends DefaultMemStore {
2677
2678    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2679      RegionServicesForStores regionServices) {
2680      super(conf, c, regionServices);
2681    }
2682
2683  }
2684
2685  /**
2686   * This test is for HBASE-26488
2687   */
2688  @Test
2689  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2690    Configuration conf = HBaseConfiguration.create();
2691
2692    byte[] smallValue = new byte[3];
2693    byte[] largeValue = new byte[9];
2694    final long timestamp = EnvironmentEdgeManager.currentTime();
2695    final long seqId = 100;
2696    final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2697    final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2698    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2699    quals.add(qf1);
2700    quals.add(qf2);
2701
2702    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2703    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2704    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2705      MyDefaultStoreFlusher.class.getName());
2706
2707    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2708    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2709    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2710
2711    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2712    store.add(smallCell, memStoreSizing);
2713    store.add(largeCell, memStoreSizing);
2714    flushStore(store, id++);
2715
2716    MemStoreLABImpl memStoreLAB =
2717      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2718    assertTrue(memStoreLAB.isClosed());
2719    assertTrue(memStoreLAB.getRefCntValue() == 0);
2720    assertTrue(memStoreLAB.isReclaimed());
2721    assertTrue(memStoreLAB.chunks.isEmpty());
2722    StoreScanner storeScanner = null;
2723    try {
2724      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2725      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2726      assertTrue(store.memstore.size().getCellsCount() == 0);
2727      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2728      assertTrue(storeScanner.currentScanners.size() == 1);
2729      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2730
2731      List<ExtendedCell> results = new ArrayList<>();
2732      storeScanner.next(results);
2733      assertEquals(2, results.size());
2734      PrivateCellUtil.equals(smallCell, results.get(0));
2735      PrivateCellUtil.equals(largeCell, results.get(1));
2736    } finally {
2737      if (storeScanner != null) {
2738        storeScanner.close();
2739      }
2740    }
2741  }
2742
2743  static class MyDefaultMemStore1 extends DefaultMemStore {
2744
2745    private ImmutableSegment snapshotImmutableSegment;
2746
2747    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2748      RegionServicesForStores regionServices) {
2749      super(conf, c, regionServices);
2750    }
2751
2752    @Override
2753    public MemStoreSnapshot snapshot() {
2754      MemStoreSnapshot result = super.snapshot();
2755      this.snapshotImmutableSegment = snapshot;
2756      return result;
2757    }
2758
2759  }
2760
2761  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2762    private static final AtomicInteger failCounter = new AtomicInteger(1);
2763    private static final AtomicInteger counter = new AtomicInteger(0);
2764
2765    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2766      super(conf, store);
2767    }
2768
2769    @Override
2770    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2771      MonitoredTask status, ThroughputController throughputController,
2772      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2773      counter.incrementAndGet();
2774      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2775        writerCreationTracker);
2776    }
2777
2778    @Override
2779    protected void performFlush(InternalScanner scanner, final CellSink sink,
2780      ThroughputController throughputController) throws IOException {
2781
2782      final int currentCount = counter.get();
2783      CellSink newCellSink = (cell) -> {
2784        if (currentCount <= failCounter.get()) {
2785          throw new IOException("Simulated exception by tests");
2786        }
2787        sink.append(cell);
2788      };
2789      super.performFlush(scanner, newCellSink, throughputController);
2790    }
2791  }
2792
2793  /**
2794   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2795   */
2796  @Test
2797  public void testImmutableMemStoreLABRefCnt() throws Exception {
2798    Configuration conf = HBaseConfiguration.create();
2799
2800    byte[] smallValue = new byte[3];
2801    byte[] largeValue = new byte[9];
2802    final long timestamp = EnvironmentEdgeManager.currentTime();
2803    final long seqId = 100;
2804    final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2805    final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2806    final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2807    final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2808    final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2809    final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2810
2811    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2812    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2813    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2814    int flushByteSize = firstWriteCellByteSize - 2;
2815
2816    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2817    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2818    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2819    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2820    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2821
2822    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2823      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2824
2825    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2826    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2827    myCompactingMemStore.allowCompaction.set(false);
2828
2829    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2830    store.add(smallCell1, memStoreSizing);
2831    store.add(largeCell1, memStoreSizing);
2832    store.add(smallCell2, memStoreSizing);
2833    store.add(largeCell2, memStoreSizing);
2834    store.add(smallCell3, memStoreSizing);
2835    store.add(largeCell3, memStoreSizing);
2836    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2837    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2838    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2839    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2840    for (ImmutableSegment segment : segments) {
2841      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2842    }
2843    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2844    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2845      assertTrue(memStoreLAB.getRefCntValue() == 2);
2846    }
2847
2848    myCompactingMemStore.allowCompaction.set(true);
2849    myCompactingMemStore.flushInMemory();
2850
2851    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2852    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2853    ImmutableMemStoreLAB immutableMemStoreLAB =
2854      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2855    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2856      assertTrue(memStoreLAB.getRefCntValue() == 2);
2857    }
2858
2859    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2860    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2861      assertTrue(memStoreLAB.getRefCntValue() == 2);
2862    }
2863    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2864    for (KeyValueScanner scanner : scanners1) {
2865      scanner.close();
2866    }
2867    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2868      assertTrue(memStoreLAB.getRefCntValue() == 1);
2869    }
2870    for (KeyValueScanner scanner : scanners2) {
2871      scanner.close();
2872    }
2873    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2874      assertTrue(memStoreLAB.getRefCntValue() == 1);
2875    }
2876    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2877    flushStore(store, id++);
2878    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2879      assertTrue(memStoreLAB.getRefCntValue() == 0);
2880    }
2881    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2882    assertTrue(immutableMemStoreLAB.isClosed());
2883    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2884      assertTrue(memStoreLAB.isClosed());
2885      assertTrue(memStoreLAB.isReclaimed());
2886      assertTrue(memStoreLAB.chunks.isEmpty());
2887    }
2888  }
2889
2890  private HStoreFile mockStoreFileWithLength(long length) {
2891    HStoreFile sf = mock(HStoreFile.class);
2892    StoreFileReader sfr = mock(StoreFileReader.class);
2893    when(sf.isHFile()).thenReturn(true);
2894    when(sf.getReader()).thenReturn(sfr);
2895    when(sfr.length()).thenReturn(length);
2896    return sf;
2897  }
2898
2899  private static class MyThread extends Thread {
2900    private StoreScanner scanner;
2901    private KeyValueHeap heap;
2902
2903    public MyThread(StoreScanner scanner) {
2904      this.scanner = scanner;
2905    }
2906
2907    public KeyValueHeap getHeap() {
2908      return this.heap;
2909    }
2910
2911    @Override
2912    public void run() {
2913      scanner.trySwitchToStreamRead();
2914      heap = scanner.heap;
2915    }
2916  }
2917
2918  private static class MyMemStoreCompactor extends MemStoreCompactor {
2919    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2920    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2921
2922    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2923      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2924      super(compactingMemStore, compactionPolicy);
2925    }
2926
2927    @Override
2928    public boolean start() throws IOException {
2929      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2930      if (isFirst) {
2931        try {
2932          START_COMPACTOR_LATCH.await();
2933          return super.start();
2934        } catch (InterruptedException ex) {
2935          throw new RuntimeException(ex);
2936        }
2937      }
2938      return super.start();
2939    }
2940  }
2941
2942  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2943    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2944
2945    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2946      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2947      throws IOException {
2948      super(conf, c, store, regionServices, compactionPolicy);
2949    }
2950
2951    @Override
2952    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2953      throws IllegalArgumentIOException {
2954      return new MyMemStoreCompactor(this, compactionPolicy);
2955    }
2956
2957    @Override
2958    protected boolean setInMemoryCompactionFlag() {
2959      boolean rval = super.setInMemoryCompactionFlag();
2960      if (rval) {
2961        RUNNER_COUNT.incrementAndGet();
2962        if (LOG.isDebugEnabled()) {
2963          LOG.debug("runner count: " + RUNNER_COUNT.get());
2964        }
2965      }
2966      return rval;
2967    }
2968  }
2969
2970  public static class MyCompactingMemStore extends CompactingMemStore {
2971    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2972    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2973    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2974
2975    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2976      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2977      throws IOException {
2978      super(conf, c, store, regionServices, compactionPolicy);
2979    }
2980
2981    @Override
2982    protected List<KeyValueScanner> createList(int capacity) {
2983      if (START_TEST.get()) {
2984        try {
2985          getScannerLatch.countDown();
2986          snapshotLatch.await();
2987        } catch (InterruptedException e) {
2988          throw new RuntimeException(e);
2989        }
2990      }
2991      return new ArrayList<>(capacity);
2992    }
2993
2994    @Override
2995    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2996      if (START_TEST.get()) {
2997        try {
2998          getScannerLatch.await();
2999        } catch (InterruptedException e) {
3000          throw new RuntimeException(e);
3001        }
3002      }
3003
3004      super.pushActiveToPipeline(active, checkEmpty);
3005      if (START_TEST.get()) {
3006        snapshotLatch.countDown();
3007      }
3008    }
3009  }
3010
3011  interface MyListHook {
3012    void hook(int currentSize);
3013  }
3014
3015  private static class MyList<T> implements List<T> {
3016    private final List<T> delegatee = new ArrayList<>();
3017    private final MyListHook hookAtAdd;
3018
3019    MyList(final MyListHook hookAtAdd) {
3020      this.hookAtAdd = hookAtAdd;
3021    }
3022
3023    @Override
3024    public int size() {
3025      return delegatee.size();
3026    }
3027
3028    @Override
3029    public boolean isEmpty() {
3030      return delegatee.isEmpty();
3031    }
3032
3033    @Override
3034    public boolean contains(Object o) {
3035      return delegatee.contains(o);
3036    }
3037
3038    @Override
3039    public Iterator<T> iterator() {
3040      return delegatee.iterator();
3041    }
3042
3043    @Override
3044    public Object[] toArray() {
3045      return delegatee.toArray();
3046    }
3047
3048    @Override
3049    public <R> R[] toArray(R[] a) {
3050      return delegatee.toArray(a);
3051    }
3052
3053    @Override
3054    public boolean add(T e) {
3055      hookAtAdd.hook(size());
3056      return delegatee.add(e);
3057    }
3058
3059    @Override
3060    public boolean remove(Object o) {
3061      return delegatee.remove(o);
3062    }
3063
3064    @Override
3065    public boolean containsAll(Collection<?> c) {
3066      return delegatee.containsAll(c);
3067    }
3068
3069    @Override
3070    public boolean addAll(Collection<? extends T> c) {
3071      return delegatee.addAll(c);
3072    }
3073
3074    @Override
3075    public boolean addAll(int index, Collection<? extends T> c) {
3076      return delegatee.addAll(index, c);
3077    }
3078
3079    @Override
3080    public boolean removeAll(Collection<?> c) {
3081      return delegatee.removeAll(c);
3082    }
3083
3084    @Override
3085    public boolean retainAll(Collection<?> c) {
3086      return delegatee.retainAll(c);
3087    }
3088
3089    @Override
3090    public void clear() {
3091      delegatee.clear();
3092    }
3093
3094    @Override
3095    public T get(int index) {
3096      return delegatee.get(index);
3097    }
3098
3099    @Override
3100    public T set(int index, T element) {
3101      return delegatee.set(index, element);
3102    }
3103
3104    @Override
3105    public void add(int index, T element) {
3106      delegatee.add(index, element);
3107    }
3108
3109    @Override
3110    public T remove(int index) {
3111      return delegatee.remove(index);
3112    }
3113
3114    @Override
3115    public int indexOf(Object o) {
3116      return delegatee.indexOf(o);
3117    }
3118
3119    @Override
3120    public int lastIndexOf(Object o) {
3121      return delegatee.lastIndexOf(o);
3122    }
3123
3124    @Override
3125    public ListIterator<T> listIterator() {
3126      return delegatee.listIterator();
3127    }
3128
3129    @Override
3130    public ListIterator<T> listIterator(int index) {
3131      return delegatee.listIterator(index);
3132    }
3133
3134    @Override
3135    public List<T> subList(int fromIndex, int toIndex) {
3136      return delegatee.subList(fromIndex, toIndex);
3137    }
3138  }
3139
3140  public static class MyCompactingMemStore2 extends CompactingMemStore {
3141    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3142    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3143    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3144    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3145    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3146    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3147
3148    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3149      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3150      throws IOException {
3151      super(conf, cellComparator, store, regionServices, compactionPolicy);
3152    }
3153
3154    @Override
3155    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3156      MemStoreSizing memstoreSizing) {
3157      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3158        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3159        if (currentCount <= 1) {
3160          try {
3161            /**
3162             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3163             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3164             * largeCellThread invokes flushInMemory.
3165             */
3166            preCyclicBarrier.await();
3167          } catch (Throwable e) {
3168            throw new RuntimeException(e);
3169          }
3170        }
3171      }
3172
3173      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3174      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3175        try {
3176          preCyclicBarrier.await();
3177        } catch (Throwable e) {
3178          throw new RuntimeException(e);
3179        }
3180      }
3181      return returnValue;
3182    }
3183
3184    @Override
3185    protected void doAdd(MutableSegment currentActive, ExtendedCell cell,
3186      MemStoreSizing memstoreSizing) {
3187      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3188        try {
3189          /**
3190           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3191           * currentActive . That is to say when largeCellThread called flushInMemory method,
3192           * currentActive has no cell.
3193           */
3194          postCyclicBarrier.await();
3195        } catch (Throwable e) {
3196          throw new RuntimeException(e);
3197        }
3198      }
3199      super.doAdd(currentActive, cell, memstoreSizing);
3200    }
3201
3202    @Override
3203    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3204      super.flushInMemory(currentActiveMutableSegment);
3205      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3206        if (largeCellPreUpdateCounter.get() <= 1) {
3207          try {
3208            postCyclicBarrier.await();
3209          } catch (Throwable e) {
3210            throw new RuntimeException(e);
3211          }
3212        }
3213      }
3214    }
3215
3216  }
3217
3218  public static class MyCompactingMemStore3 extends CompactingMemStore {
3219    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3220    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3221
3222    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3223    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3224    private final AtomicInteger flushCounter = new AtomicInteger(0);
3225    private static final int CELL_COUNT = 5;
3226    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3227
3228    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3229      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3230      throws IOException {
3231      super(conf, cellComparator, store, regionServices, compactionPolicy);
3232    }
3233
3234    @Override
3235    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3236      MemStoreSizing memstoreSizing) {
3237      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3238        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3239      }
3240      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3241        try {
3242          preCyclicBarrier.await();
3243        } catch (Throwable e) {
3244          throw new RuntimeException(e);
3245        }
3246      }
3247
3248      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3249      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3250        try {
3251          preCyclicBarrier.await();
3252        } catch (Throwable e) {
3253          throw new RuntimeException(e);
3254        }
3255      }
3256      return returnValue;
3257    }
3258
3259    @Override
3260    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3261      super.postUpdate(currentActiveMutableSegment);
3262      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3263        try {
3264          postCyclicBarrier.await();
3265        } catch (Throwable e) {
3266          throw new RuntimeException(e);
3267        }
3268        return;
3269      }
3270
3271      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3272        try {
3273          postCyclicBarrier.await();
3274        } catch (Throwable e) {
3275          throw new RuntimeException(e);
3276        }
3277      }
3278    }
3279
3280    @Override
3281    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3282      super.flushInMemory(currentActiveMutableSegment);
3283      flushCounter.incrementAndGet();
3284      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3285        return;
3286      }
3287
3288      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3289      try {
3290        postCyclicBarrier.await();
3291      } catch (Throwable e) {
3292        throw new RuntimeException(e);
3293      }
3294
3295    }
3296
3297    void disableCompaction() {
3298      allowCompaction.set(false);
3299    }
3300
3301    void enableCompaction() {
3302      allowCompaction.set(true);
3303    }
3304
3305  }
3306
3307  public static class MyCompactingMemStore4 extends CompactingMemStore {
3308    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3309    /**
3310     * {@link CompactingMemStore#flattenOneSegment} must execute after
3311     * {@link CompactingMemStore#getImmutableSegments}
3312     */
3313    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3314    /**
3315     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3316     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3317     */
3318    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3319    /**
3320     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3321     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3322     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3323     */
3324    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3325    /**
3326     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3327     */
3328    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3329    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3330    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3331    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3332    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3333
3334    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3335      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3336      throws IOException {
3337      super(conf, cellComparator, store, regionServices, compactionPolicy);
3338    }
3339
3340    @Override
3341    public VersionedSegmentsList getImmutableSegments() {
3342      VersionedSegmentsList result = super.getImmutableSegments();
3343      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3344        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3345        if (currentCount <= 1) {
3346          try {
3347            flattenOneSegmentPreCyclicBarrier.await();
3348          } catch (Throwable e) {
3349            throw new RuntimeException(e);
3350          }
3351        }
3352      }
3353      return result;
3354    }
3355
3356    @Override
3357    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3358      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3359        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3360        if (currentCount <= 1) {
3361          try {
3362            flattenOneSegmentPostCyclicBarrier.await();
3363          } catch (Throwable e) {
3364            throw new RuntimeException(e);
3365          }
3366        }
3367      }
3368      boolean result = super.swapPipelineWithNull(segments);
3369      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3370        int currentCount = swapPipelineWithNullCounter.get();
3371        if (currentCount <= 1) {
3372          assertTrue(!result);
3373        }
3374        if (currentCount == 2) {
3375          assertTrue(result);
3376        }
3377      }
3378      return result;
3379
3380    }
3381
3382    @Override
3383    public void flattenOneSegment(long requesterVersion, Action action) {
3384      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3385      if (currentCount <= 1) {
3386        try {
3387          /**
3388           * {@link CompactingMemStore#snapshot} could start.
3389           */
3390          snapShotStartCyclicCyclicBarrier.await();
3391          flattenOneSegmentPreCyclicBarrier.await();
3392        } catch (Throwable e) {
3393          throw new RuntimeException(e);
3394        }
3395      }
3396      super.flattenOneSegment(requesterVersion, action);
3397      if (currentCount <= 1) {
3398        try {
3399          flattenOneSegmentPostCyclicBarrier.await();
3400        } catch (Throwable e) {
3401          throw new RuntimeException(e);
3402        }
3403      }
3404    }
3405
3406    @Override
3407    protected boolean setInMemoryCompactionFlag() {
3408      boolean result = super.setInMemoryCompactionFlag();
3409      assertTrue(result);
3410      setInMemoryCompactionFlagCounter.incrementAndGet();
3411      return result;
3412    }
3413
3414    @Override
3415    void inMemoryCompaction() {
3416      try {
3417        super.inMemoryCompaction();
3418      } finally {
3419        try {
3420          inMemoryCompactionEndCyclicBarrier.await();
3421        } catch (Throwable e) {
3422          throw new RuntimeException(e);
3423        }
3424
3425      }
3426    }
3427
3428  }
3429
3430  public static class MyCompactingMemStore5 extends CompactingMemStore {
3431    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3432    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3433    /**
3434     * {@link CompactingMemStore#flattenOneSegment} must execute after
3435     * {@link CompactingMemStore#getImmutableSegments}
3436     */
3437    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3438    /**
3439     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3440     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3441     */
3442    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3443    /**
3444     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3445     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3446     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3447     */
3448    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3449    /**
3450     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3451     */
3452    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3453    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3454    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3455    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3456    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3457    /**
3458     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3459     * thread could start.
3460     */
3461    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3462    /**
3463     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3464     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3465     * execute,and in memory compact thread would exit,because we expect that in memory compact
3466     * executing only once.
3467     */
3468    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3469
3470    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3471      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3472      throws IOException {
3473      super(conf, cellComparator, store, regionServices, compactionPolicy);
3474    }
3475
3476    @Override
3477    public VersionedSegmentsList getImmutableSegments() {
3478      VersionedSegmentsList result = super.getImmutableSegments();
3479      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3480        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3481        if (currentCount <= 1) {
3482          try {
3483            flattenOneSegmentPreCyclicBarrier.await();
3484          } catch (Throwable e) {
3485            throw new RuntimeException(e);
3486          }
3487        }
3488
3489      }
3490
3491      return result;
3492    }
3493
3494    @Override
3495    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3496      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3497        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3498        if (currentCount <= 1) {
3499          try {
3500            flattenOneSegmentPostCyclicBarrier.await();
3501          } catch (Throwable e) {
3502            throw new RuntimeException(e);
3503          }
3504        }
3505
3506        if (currentCount == 2) {
3507          try {
3508            /**
3509             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3510             * writeAgain thread could start.
3511             */
3512            writeMemStoreAgainStartCyclicBarrier.await();
3513            /**
3514             * Only the writeAgain thread completes, retry
3515             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3516             */
3517            writeMemStoreAgainEndCyclicBarrier.await();
3518          } catch (Throwable e) {
3519            throw new RuntimeException(e);
3520          }
3521        }
3522
3523      }
3524      boolean result = super.swapPipelineWithNull(segments);
3525      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3526        int currentCount = swapPipelineWithNullCounter.get();
3527        if (currentCount <= 1) {
3528          assertTrue(!result);
3529        }
3530        if (currentCount == 2) {
3531          assertTrue(result);
3532        }
3533      }
3534      return result;
3535
3536    }
3537
3538    @Override
3539    public void flattenOneSegment(long requesterVersion, Action action) {
3540      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3541      if (currentCount <= 1) {
3542        try {
3543          /**
3544           * {@link CompactingMemStore#snapshot} could start.
3545           */
3546          snapShotStartCyclicCyclicBarrier.await();
3547          flattenOneSegmentPreCyclicBarrier.await();
3548        } catch (Throwable e) {
3549          throw new RuntimeException(e);
3550        }
3551      }
3552      super.flattenOneSegment(requesterVersion, action);
3553      if (currentCount <= 1) {
3554        try {
3555          flattenOneSegmentPostCyclicBarrier.await();
3556          /**
3557           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3558           * expect that in memory compact executing only once.
3559           */
3560          writeMemStoreAgainEndCyclicBarrier.await();
3561        } catch (Throwable e) {
3562          throw new RuntimeException(e);
3563        }
3564
3565      }
3566    }
3567
3568    @Override
3569    protected boolean setInMemoryCompactionFlag() {
3570      boolean result = super.setInMemoryCompactionFlag();
3571      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3572      if (count <= 1) {
3573        assertTrue(result);
3574      }
3575      if (count == 2) {
3576        assertTrue(!result);
3577      }
3578      return result;
3579    }
3580
3581    @Override
3582    void inMemoryCompaction() {
3583      try {
3584        super.inMemoryCompaction();
3585      } finally {
3586        try {
3587          inMemoryCompactionEndCyclicBarrier.await();
3588        } catch (Throwable e) {
3589          throw new RuntimeException(e);
3590        }
3591
3592      }
3593    }
3594  }
3595
3596  public static class MyCompactingMemStore6 extends CompactingMemStore {
3597    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3598
3599    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3600      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3601      throws IOException {
3602      super(conf, cellComparator, store, regionServices, compactionPolicy);
3603    }
3604
3605    @Override
3606    void inMemoryCompaction() {
3607      try {
3608        super.inMemoryCompaction();
3609      } finally {
3610        try {
3611          inMemoryCompactionEndCyclicBarrier.await();
3612        } catch (Throwable e) {
3613          throw new RuntimeException(e);
3614        }
3615
3616      }
3617    }
3618  }
3619
3620  public static class MyDefaultMemStore extends DefaultMemStore {
3621    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3622    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3623    /**
3624     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3625     * could start.
3626     */
3627    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3628    /**
3629     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3630     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3631     */
3632    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3633    /**
3634     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3635     * completed, {@link DefaultMemStore#getScanners} could continue.
3636     */
3637    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3638    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3639    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3640    private volatile boolean shouldWait = true;
3641    private volatile HStore store = null;
3642
3643    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3644      RegionServicesForStores regionServices) throws IOException {
3645      super(conf, cellComparator, regionServices);
3646    }
3647
3648    @Override
3649    protected List<Segment> getSnapshotSegments() {
3650
3651      List<Segment> result = super.getSnapshotSegments();
3652
3653      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3654        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3655        if (currentCount == 1) {
3656          if (this.shouldWait) {
3657            try {
3658              /**
3659               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3660               * {@link DefaultMemStore#doClearSnapShot} could continue.
3661               */
3662              preClearSnapShotCyclicBarrier.await();
3663              /**
3664               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3665               */
3666              postClearSnapShotCyclicBarrier.await();
3667
3668            } catch (Throwable e) {
3669              throw new RuntimeException(e);
3670            }
3671          }
3672        }
3673      }
3674      return result;
3675    }
3676
3677    @Override
3678    protected void doClearSnapShot() {
3679      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3680        int currentCount = clearSnapshotCounter.incrementAndGet();
3681        if (currentCount == 1) {
3682          try {
3683            if (
3684              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3685                .isWriteLockedByCurrentThread()
3686            ) {
3687              shouldWait = false;
3688            }
3689            /**
3690             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3691             * thread could start.
3692             */
3693            getScannerCyclicBarrier.await();
3694
3695            if (shouldWait) {
3696              /**
3697               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3698               */
3699              preClearSnapShotCyclicBarrier.await();
3700            }
3701          } catch (Throwable e) {
3702            throw new RuntimeException(e);
3703          }
3704        }
3705      }
3706      super.doClearSnapShot();
3707
3708      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3709        int currentCount = clearSnapshotCounter.get();
3710        if (currentCount == 1) {
3711          if (shouldWait) {
3712            try {
3713              /**
3714               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3715               * {@link DefaultMemStore#getScanners} could continue.
3716               */
3717              postClearSnapShotCyclicBarrier.await();
3718            } catch (Throwable e) {
3719              throw new RuntimeException(e);
3720            }
3721          }
3722        }
3723      }
3724    }
3725  }
3726}