001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ;
023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE;
024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
027import static org.junit.Assert.assertArrayEquals;
028import static org.junit.Assert.assertEquals;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.junit.Assert.fail;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.spy;
036import static org.mockito.Mockito.times;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.when;
039
040import java.io.FileNotFoundException;
041import java.io.IOException;
042import java.lang.ref.SoftReference;
043import java.security.PrivilegedExceptionAction;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.Iterator;
049import java.util.List;
050import java.util.ListIterator;
051import java.util.NavigableSet;
052import java.util.Optional;
053import java.util.TreeSet;
054import java.util.concurrent.BrokenBarrierException;
055import java.util.concurrent.ConcurrentSkipListSet;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.CyclicBarrier;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.ThreadPoolExecutor;
062import java.util.concurrent.TimeUnit;
063import java.util.concurrent.atomic.AtomicBoolean;
064import java.util.concurrent.atomic.AtomicInteger;
065import java.util.concurrent.atomic.AtomicLong;
066import java.util.concurrent.atomic.AtomicReference;
067import java.util.concurrent.locks.ReentrantReadWriteLock;
068import java.util.function.Consumer;
069import java.util.function.IntBinaryOperator;
070import org.apache.hadoop.conf.Configuration;
071import org.apache.hadoop.fs.FSDataOutputStream;
072import org.apache.hadoop.fs.FileStatus;
073import org.apache.hadoop.fs.FileSystem;
074import org.apache.hadoop.fs.FilterFileSystem;
075import org.apache.hadoop.fs.LocalFileSystem;
076import org.apache.hadoop.fs.Path;
077import org.apache.hadoop.fs.permission.FsPermission;
078import org.apache.hadoop.hbase.Cell;
079import org.apache.hadoop.hbase.CellBuilderFactory;
080import org.apache.hadoop.hbase.CellBuilderType;
081import org.apache.hadoop.hbase.CellComparator;
082import org.apache.hadoop.hbase.CellComparatorImpl;
083import org.apache.hadoop.hbase.CellUtil;
084import org.apache.hadoop.hbase.ExtendedCell;
085import org.apache.hadoop.hbase.HBaseClassTestRule;
086import org.apache.hadoop.hbase.HBaseConfiguration;
087import org.apache.hadoop.hbase.HBaseTestingUtility;
088import org.apache.hadoop.hbase.HConstants;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.MemoryCompactionPolicy;
091import org.apache.hadoop.hbase.NamespaceDescriptor;
092import org.apache.hadoop.hbase.PrivateCellUtil;
093import org.apache.hadoop.hbase.TableName;
094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
096import org.apache.hadoop.hbase.client.Get;
097import org.apache.hadoop.hbase.client.RegionInfo;
098import org.apache.hadoop.hbase.client.RegionInfoBuilder;
099import org.apache.hadoop.hbase.client.Scan;
100import org.apache.hadoop.hbase.client.Scan.ReadType;
101import org.apache.hadoop.hbase.client.TableDescriptor;
102import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
104import org.apache.hadoop.hbase.filter.Filter;
105import org.apache.hadoop.hbase.filter.FilterBase;
106import org.apache.hadoop.hbase.io.compress.Compression;
107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
108import org.apache.hadoop.hbase.io.hfile.CacheConfig;
109import org.apache.hadoop.hbase.io.hfile.HFile;
110import org.apache.hadoop.hbase.io.hfile.HFileContext;
111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
112import org.apache.hadoop.hbase.monitoring.MonitoredTask;
113import org.apache.hadoop.hbase.nio.RefCnt;
114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
122import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
123import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
124import org.apache.hadoop.hbase.security.User;
125import org.apache.hadoop.hbase.testclassification.MediumTests;
126import org.apache.hadoop.hbase.testclassification.RegionServerTests;
127import org.apache.hadoop.hbase.util.BloomFilterUtil;
128import org.apache.hadoop.hbase.util.Bytes;
129import org.apache.hadoop.hbase.util.CommonFSUtils;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
132import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
133import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
134import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
135import org.apache.hadoop.hbase.wal.WALFactory;
136import org.apache.hadoop.util.Progressable;
137import org.junit.After;
138import org.junit.AfterClass;
139import org.junit.Before;
140import org.junit.ClassRule;
141import org.junit.Rule;
142import org.junit.Test;
143import org.junit.experimental.categories.Category;
144import org.junit.rules.TestName;
145import org.mockito.Mockito;
146import org.slf4j.Logger;
147import org.slf4j.LoggerFactory;
148
149import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
150
151/**
152 * Test class for the HStore
153 */
154@Category({ RegionServerTests.class, MediumTests.class })
155public class TestHStore {
156
157  @ClassRule
158  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class);
159
160  private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class);
161  @Rule
162  public TestName name = new TestName();
163
164  HRegion region;
165  HStore store;
166  byte[] table = Bytes.toBytes("table");
167  byte[] family = Bytes.toBytes("family");
168
169  byte[] row = Bytes.toBytes("row");
170  byte[] row2 = Bytes.toBytes("row2");
171  byte[] qf1 = Bytes.toBytes("qf1");
172  byte[] qf2 = Bytes.toBytes("qf2");
173  byte[] qf3 = Bytes.toBytes("qf3");
174  byte[] qf4 = Bytes.toBytes("qf4");
175  byte[] qf5 = Bytes.toBytes("qf5");
176  byte[] qf6 = Bytes.toBytes("qf6");
177
178  NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
179
180  List<Cell> expected = new ArrayList<>();
181  List<Cell> result = new ArrayList<>();
182
183  long id = EnvironmentEdgeManager.currentTime();
184  Get get = new Get(row);
185
186  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
187  private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
188
189  @Before
190  public void setUp() throws IOException {
191    qualifiers.clear();
192    qualifiers.add(qf1);
193    qualifiers.add(qf3);
194    qualifiers.add(qf5);
195
196    Iterator<byte[]> iter = qualifiers.iterator();
197    while (iter.hasNext()) {
198      byte[] next = iter.next();
199      expected.add(new KeyValue(row, family, next, 1, (byte[]) null));
200      get.addColumn(family, next);
201    }
202  }
203
204  private void init(String methodName) throws IOException {
205    init(methodName, TEST_UTIL.getConfiguration());
206  }
207
208  private HStore init(String methodName, Configuration conf) throws IOException {
209    // some of the tests write 4 versions and then flush
210    // (with HBASE-4241, lower versions are collected on flush)
211    return init(methodName, conf,
212      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build());
213  }
214
215  private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd)
216    throws IOException {
217    return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd);
218  }
219
220  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
221    ColumnFamilyDescriptor hcd) throws IOException {
222    return init(methodName, conf, builder, hcd, null);
223  }
224
225  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
226    ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException {
227    return init(methodName, conf, builder, hcd, hook, false);
228  }
229
230  private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder,
231    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
232    TableDescriptor htd = builder.setColumnFamily(hcd).build();
233    Path basedir = new Path(DIR + methodName);
234    Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName());
235    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
236
237    FileSystem fs = FileSystem.get(conf);
238
239    fs.delete(logdir, true);
240    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
241      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null,
242      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
243    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
244    Configuration walConf = new Configuration(conf);
245    CommonFSUtils.setRootDir(walConf, basedir);
246    WALFactory wals = new WALFactory(walConf, methodName);
247    region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
248      htd, null);
249    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
250    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
251    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
252  }
253
254  private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
255    ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
256    initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
257    if (hook == null) {
258      store = new HStore(region, hcd, conf, false);
259    } else {
260      store = new MyStore(region, hcd, conf, hook, switchToPread);
261    }
262    region.stores.put(store.getColumnFamilyDescriptor().getName(), store);
263    return store;
264  }
265
266  /**
267   * Test we do not lose data if we fail a flush and then close. Part of HBase-10466
268   */
269  @Test
270  public void testFlushSizeSizing() throws Exception {
271    LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName());
272    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
273    // Only retry once.
274    conf.setInt("hbase.hstore.flush.retries.number", 1);
275    User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" });
276    // Inject our faulty LocalFileSystem
277    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
278    user.runAs(new PrivilegedExceptionAction<Object>() {
279      @Override
280      public Object run() throws Exception {
281        // Make sure it worked (above is sensitive to caching details in hadoop core)
282        FileSystem fs = FileSystem.get(conf);
283        assertEquals(FaultyFileSystem.class, fs.getClass());
284        FaultyFileSystem ffs = (FaultyFileSystem) fs;
285
286        // Initialize region
287        init(name.getMethodName(), conf);
288
289        MemStoreSize mss = store.memstore.getFlushableSize();
290        assertEquals(0, mss.getDataSize());
291        LOG.info("Adding some data");
292        MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing();
293        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
294        // add the heap size of active (mutable) segment
295        kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
296        mss = store.memstore.getFlushableSize();
297        assertEquals(kvSize.getMemStoreSize(), mss);
298        // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
299        try {
300          LOG.info("Flushing");
301          flushStore(store, id++);
302          fail("Didn't bubble up IOE!");
303        } catch (IOException ioe) {
304          assertTrue(ioe.getMessage().contains("Fault injected"));
305        }
306        // due to snapshot, change mutable to immutable segment
307        kvSize.incMemStoreSize(0,
308          CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0);
309        mss = store.memstore.getFlushableSize();
310        assertEquals(kvSize.getMemStoreSize(), mss);
311        MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing();
312        store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2);
313        kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0);
314        // Even though we add a new kv, we expect the flushable size to be 'same' since we have
315        // not yet cleared the snapshot -- the above flush failed.
316        assertEquals(kvSize.getMemStoreSize(), mss);
317        ffs.fault.set(false);
318        flushStore(store, id++);
319        mss = store.memstore.getFlushableSize();
320        // Size should be the foreground kv size.
321        assertEquals(kvSize2.getMemStoreSize(), mss);
322        flushStore(store, id++);
323        mss = store.memstore.getFlushableSize();
324        assertEquals(0, mss.getDataSize());
325        assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize());
326        return null;
327      }
328    });
329  }
330
331  @Test
332  public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException {
333    int numStoreFiles = 5;
334    writeAndRead(BloomType.ROWCOL, numStoreFiles);
335
336    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
337    // hard to know exactly the numbers here, we are just trying to
338    // prove that they are incrementing
339    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
340    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
341  }
342
343  @Test
344  public void testStoreBloomFilterMetricsWithBloomRow() throws IOException {
345    int numStoreFiles = 5;
346    writeAndRead(BloomType.ROWCOL, numStoreFiles);
347
348    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
349    // hard to know exactly the numbers here, we are just trying to
350    // prove that they are incrementing
351    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
352    assertTrue(store.getBloomFilterNegativeResultsCount() > 0);
353  }
354
355  @Test
356  public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException {
357    int numStoreFiles = 5;
358    writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles);
359
360    assertEquals(0, store.getBloomFilterEligibleRequestsCount());
361    // hard to know exactly the numbers here, we are just trying to
362    // prove that they are incrementing
363    assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles);
364  }
365
366  @Test
367  public void testStoreBloomFilterMetricsWithBloomNone() throws IOException {
368    int numStoreFiles = 5;
369    writeAndRead(BloomType.NONE, numStoreFiles);
370
371    assertEquals(0, store.getBloomFilterRequestsCount());
372    assertEquals(0, store.getBloomFilterNegativeResultsCount());
373
374    // hard to know exactly the numbers here, we are just trying to
375    // prove that they are incrementing
376    assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles);
377  }
378
379  private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException {
380    Configuration conf = HBaseConfiguration.create();
381    FileSystem fs = FileSystem.get(conf);
382
383    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
384      .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType)
385      .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build();
386    init(name.getMethodName(), conf, hcd);
387
388    for (int i = 1; i <= numStoreFiles; i++) {
389      byte[] row = Bytes.toBytes("row" + i);
390      LOG.info("Adding some data for the store file #" + i);
391      long timeStamp = EnvironmentEdgeManager.currentTime();
392      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
393      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
394      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
395      flush(i);
396    }
397
398    // Verify the total number of store files
399    assertEquals(numStoreFiles, this.store.getStorefiles().size());
400
401    TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR);
402    columns.add(qf1);
403
404    for (int i = 1; i <= numStoreFiles; i++) {
405      KeyValueScanner scanner =
406        store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0);
407      scanner.peek();
408    }
409  }
410
411  /**
412   * Verify that compression and data block encoding are respected by the createWriter method, used
413   * on store flush.
414   */
415  @Test
416  public void testCreateWriter() throws Exception {
417    Configuration conf = HBaseConfiguration.create();
418    FileSystem fs = FileSystem.get(conf);
419
420    ColumnFamilyDescriptor hcd =
421      ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ)
422        .setDataBlockEncoding(DataBlockEncoding.DIFF).build();
423    init(name.getMethodName(), conf, hcd);
424
425    // Test createWriter
426    StoreFileWriter writer = store.getStoreEngine()
427      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
428        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
429        .includesTag(false).shouldDropBehind(false));
430    Path path = writer.getPath();
431    writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
432    writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
433    writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
434    writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
435    writer.close();
436
437    // Verify that compression and encoding settings are respected
438    HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
439    assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec());
440    assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
441    reader.close();
442  }
443
444  @Test
445  public void testDeleteExpiredStoreFiles() throws Exception {
446    testDeleteExpiredStoreFiles(0);
447    testDeleteExpiredStoreFiles(1);
448  }
449
450  /**
451   * @param minVersions the MIN_VERSIONS for the column family
452   */
453  public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
454    int storeFileNum = 4;
455    int ttl = 4;
456    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
457    EnvironmentEdgeManagerTestHelper.injectEdge(edge);
458
459    Configuration conf = HBaseConfiguration.create();
460    // Enable the expired store file deletion
461    conf.setBoolean("hbase.store.delete.expired.storefile", true);
462    // Set the compaction threshold higher to avoid normal compactions.
463    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
464
465    init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder
466      .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build());
467
468    long storeTtl = this.store.getScanInfo().getTtl();
469    long sleepTime = storeTtl / storeFileNum;
470    long timeStamp;
471    // There are 4 store files and the max time stamp difference among these
472    // store files will be (this.store.ttl / storeFileNum)
473    for (int i = 1; i <= storeFileNum; i++) {
474      LOG.info("Adding some data for the store file #" + i);
475      timeStamp = EnvironmentEdgeManager.currentTime();
476      this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null);
477      this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null);
478      this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null);
479      flush(i);
480      edge.incrementTime(sleepTime);
481    }
482
483    // Verify the total number of store files
484    assertEquals(storeFileNum, this.store.getStorefiles().size());
485
486    // Each call will find one expired store file and delete it before compaction happens.
487    // There will be no compaction due to threshold above. Last file will not be replaced.
488    for (int i = 1; i <= storeFileNum - 1; i++) {
489      // verify the expired store file.
490      assertFalse(this.store.requestCompaction().isPresent());
491      Collection<HStoreFile> sfs = this.store.getStorefiles();
492      // Ensure i files are gone.
493      if (minVersions == 0) {
494        assertEquals(storeFileNum - i, sfs.size());
495        // Ensure only non-expired files remain.
496        for (HStoreFile sf : sfs) {
497          assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
498        }
499      } else {
500        assertEquals(storeFileNum, sfs.size());
501      }
502      // Let the next store file expired.
503      edge.incrementTime(sleepTime);
504    }
505    assertFalse(this.store.requestCompaction().isPresent());
506
507    Collection<HStoreFile> sfs = this.store.getStorefiles();
508    // Assert the last expired file is not removed.
509    if (minVersions == 0) {
510      assertEquals(1, sfs.size());
511    }
512    long ts = sfs.iterator().next().getReader().getMaxTimestamp();
513    assertTrue(ts < (edge.currentTime() - storeTtl));
514
515    for (HStoreFile sf : sfs) {
516      sf.closeStoreFile(true);
517    }
518  }
519
520  @Test
521  public void testLowestModificationTime() throws Exception {
522    Configuration conf = HBaseConfiguration.create();
523    FileSystem fs = FileSystem.get(conf);
524    // Initialize region
525    init(name.getMethodName(), conf);
526
527    int storeFileNum = 4;
528    for (int i = 1; i <= storeFileNum; i++) {
529      LOG.info("Adding some data for the store file #" + i);
530      this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null);
531      this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null);
532      this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null);
533      flush(i);
534    }
535    // after flush; check the lowest time stamp
536    long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
537    long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
538    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
539
540    // after compact; check the lowest time stamp
541    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
542    lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
543    lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
544    assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
545  }
546
547  private static long getLowestTimeStampFromFS(FileSystem fs,
548    final Collection<HStoreFile> candidates) throws IOException {
549    long minTs = Long.MAX_VALUE;
550    if (candidates.isEmpty()) {
551      return minTs;
552    }
553    Path[] p = new Path[candidates.size()];
554    int i = 0;
555    for (HStoreFile sf : candidates) {
556      p[i] = sf.getPath();
557      ++i;
558    }
559
560    FileStatus[] stats = fs.listStatus(p);
561    if (stats == null || stats.length == 0) {
562      return minTs;
563    }
564    for (FileStatus s : stats) {
565      minTs = Math.min(minTs, s.getModificationTime());
566    }
567    return minTs;
568  }
569
570  //////////////////////////////////////////////////////////////////////////////
571  // Get tests
572  //////////////////////////////////////////////////////////////////////////////
573
574  private static final int BLOCKSIZE_SMALL = 8192;
575
576  /**
577   * Test for hbase-1686.
578   */
579  @Test
580  public void testEmptyStoreFile() throws IOException {
581    init(this.name.getMethodName());
582    // Write a store file.
583    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
584    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
585    flush(1);
586    // Now put in place an empty store file. Its a little tricky. Have to
587    // do manually with hacked in sequence id.
588    HStoreFile f = this.store.getStorefiles().iterator().next();
589    Path storedir = f.getPath().getParent();
590    long seqid = f.getMaxSequenceId();
591    Configuration c = HBaseConfiguration.create();
592    FileSystem fs = FileSystem.get(c);
593    HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
594    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
595      .withOutputDir(storedir).withFileContext(meta).build();
596    w.appendMetadata(seqid + 1, false);
597    w.close();
598    this.store.close();
599    // Reopen it... should pick up two files
600    this.store =
601      new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
602    assertEquals(2, this.store.getStorefilesCount());
603
604    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
605    assertEquals(1, result.size());
606  }
607
608  /**
609   * Getting data from memstore only
610   */
611  @Test
612  public void testGet_FromMemStoreOnly() throws IOException {
613    init(this.name.getMethodName());
614
615    // Put data in memstore
616    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
617    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
618    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
619    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
620    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
621    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
622
623    // Get
624    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
625
626    // Compare
627    assertCheck();
628  }
629
630  @Test
631  public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
632    testTimeRangeIfSomeCellsAreDroppedInFlush(1);
633    testTimeRangeIfSomeCellsAreDroppedInFlush(3);
634    testTimeRangeIfSomeCellsAreDroppedInFlush(5);
635  }
636
637  private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
638    init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
639      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
640    long currentTs = 100;
641    long minTs = currentTs;
642    // the extra cell won't be flushed to disk,
643    // so the min of timerange will be different between memStore and hfile.
644    for (int i = 0; i != (maxVersion + 1); ++i) {
645      this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
646      if (i == 1) {
647        minTs = currentTs;
648      }
649    }
650    flushStore(store, id++);
651
652    Collection<HStoreFile> files = store.getStorefiles();
653    assertEquals(1, files.size());
654    HStoreFile f = files.iterator().next();
655    f.initReader();
656    StoreFileReader reader = f.getReader();
657    assertEquals(minTs, reader.timeRange.getMin());
658    assertEquals(currentTs, reader.timeRange.getMax());
659  }
660
661  /**
662   * Getting data from files only
663   */
664  @Test
665  public void testGet_FromFilesOnly() throws IOException {
666    init(this.name.getMethodName());
667
668    // Put data in memstore
669    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
670    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
671    // flush
672    flush(1);
673
674    // Add more data
675    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
676    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
677    // flush
678    flush(2);
679
680    // Add more data
681    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
682    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
683    // flush
684    flush(3);
685
686    // Get
687    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
688    // this.store.get(get, qualifiers, result);
689
690    // Need to sort the result since multiple files
691    Collections.sort(result, CellComparatorImpl.COMPARATOR);
692
693    // Compare
694    assertCheck();
695  }
696
697  /**
698   * Getting data from memstore and files
699   */
700  @Test
701  public void testGet_FromMemStoreAndFiles() throws IOException {
702    init(this.name.getMethodName());
703
704    // Put data in memstore
705    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
706    this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
707    // flush
708    flush(1);
709
710    // Add more data
711    this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
712    this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null);
713    // flush
714    flush(2);
715
716    // Add more data
717    this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null);
718    this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null);
719
720    // Get
721    result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers);
722
723    // Need to sort the result since multiple files
724    Collections.sort(result, CellComparatorImpl.COMPARATOR);
725
726    // Compare
727    assertCheck();
728  }
729
730  private void flush(int storeFilessize) throws IOException {
731    flushStore(store, id++);
732    assertEquals(storeFilessize, this.store.getStorefiles().size());
733    assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount());
734  }
735
736  private void assertCheck() {
737    assertEquals(expected.size(), result.size());
738    for (int i = 0; i < expected.size(); i++) {
739      assertEquals(expected.get(i), result.get(i));
740    }
741  }
742
743  @After
744  public void tearDown() throws Exception {
745    EnvironmentEdgeManagerTestHelper.reset();
746    if (store != null) {
747      try {
748        store.close();
749      } catch (IOException e) {
750      }
751      store = null;
752    }
753    if (region != null) {
754      region.close();
755      region = null;
756    }
757  }
758
759  @AfterClass
760  public static void tearDownAfterClass() throws IOException {
761    TEST_UTIL.cleanupTestDir();
762  }
763
764  @Test
765  public void testHandleErrorsInFlush() throws Exception {
766    LOG.info("Setting up a faulty file system that cannot write");
767
768    final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
769    User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" });
770    // Inject our faulty LocalFileSystem
771    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
772    user.runAs(new PrivilegedExceptionAction<Object>() {
773      @Override
774      public Object run() throws Exception {
775        // Make sure it worked (above is sensitive to caching details in hadoop core)
776        FileSystem fs = FileSystem.get(conf);
777        assertEquals(FaultyFileSystem.class, fs.getClass());
778
779        // Initialize region
780        init(name.getMethodName(), conf);
781
782        LOG.info("Adding some data");
783        store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
784        store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null);
785        store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null);
786
787        LOG.info("Before flush, we should have no files");
788
789        Collection<StoreFileInfo> files =
790          store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
791        assertEquals(0, files != null ? files.size() : 0);
792
793        // flush
794        try {
795          LOG.info("Flushing");
796          flush(1);
797          fail("Didn't bubble up IOE!");
798        } catch (IOException ioe) {
799          assertTrue(ioe.getMessage().contains("Fault injected"));
800        }
801
802        LOG.info("After failed flush, we should still have no files!");
803        files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
804        assertEquals(0, files != null ? files.size() : 0);
805        store.getHRegion().getWAL().close();
806        return null;
807      }
808    });
809    FileSystem.closeAllForUGI(user.getUGI());
810  }
811
812  /**
813   * Faulty file system that will fail if you write past its fault position the FIRST TIME only;
814   * thereafter it will succeed. Used by {@link TestHRegion} too.
815   */
816  static class FaultyFileSystem extends FilterFileSystem {
817    List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>();
818    private long faultPos = 200;
819    AtomicBoolean fault = new AtomicBoolean(true);
820
821    public FaultyFileSystem() {
822      super(new LocalFileSystem());
823      LOG.info("Creating faulty!");
824    }
825
826    @Override
827    public FSDataOutputStream create(Path p) throws IOException {
828      return new FaultyOutputStream(super.create(p), faultPos, fault);
829    }
830
831    @Override
832    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
833      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
834      return new FaultyOutputStream(
835        super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress),
836        faultPos, fault);
837    }
838
839    @Override
840    public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
841      short replication, long blockSize, Progressable progress) throws IOException {
842      // Fake it. Call create instead. The default implementation throws an IOE
843      // that this is not supported.
844      return create(f, overwrite, bufferSize, replication, blockSize, progress);
845    }
846  }
847
848  static class FaultyOutputStream extends FSDataOutputStream {
849    volatile long faultPos = Long.MAX_VALUE;
850    private final AtomicBoolean fault;
851
852    public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
853      throws IOException {
854      super(out, null);
855      this.faultPos = faultPos;
856      this.fault = fault;
857    }
858
859    @Override
860    public synchronized void write(byte[] buf, int offset, int length) throws IOException {
861      LOG.info("faulty stream write at pos " + getPos());
862      injectFault();
863      super.write(buf, offset, length);
864    }
865
866    private void injectFault() throws IOException {
867      if (this.fault.get() && getPos() >= faultPos) {
868        throw new IOException("Fault injected");
869      }
870    }
871  }
872
873  private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
874    StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
875    storeFlushCtx.prepare();
876    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
877    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
878    return storeFlushCtx;
879  }
880
881  /**
882   * Generate a list of KeyValues for testing based on given parameters
883   * @return the rows key-value list
884   */
885  private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier,
886    byte[] family) {
887    List<Cell> kvList = new ArrayList<>();
888    for (int i = 1; i <= numRows; i++) {
889      byte[] b = Bytes.toBytes(i);
890      for (long timestamp : timestamps) {
891        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
892      }
893    }
894    return kvList;
895  }
896
897  /**
898   * Test to ensure correctness when using Stores with multiple timestamps
899   */
900  @Test
901  public void testMultipleTimestamps() throws IOException {
902    int numRows = 1;
903    long[] timestamps1 = new long[] { 1, 5, 10, 20 };
904    long[] timestamps2 = new long[] { 30, 80 };
905
906    init(this.name.getMethodName());
907
908    List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family);
909    for (Cell kv : kvList1) {
910      this.store.add(kv, null);
911    }
912
913    flushStore(store, id++);
914
915    List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family);
916    for (Cell kv : kvList2) {
917      this.store.add(kv, null);
918    }
919
920    List<Cell> result;
921    Get get = new Get(Bytes.toBytes(1));
922    get.addColumn(family, qf1);
923
924    get.setTimeRange(0, 15);
925    result = HBaseTestingUtility.getFromStoreFile(store, get);
926    assertTrue(result.size() > 0);
927
928    get.setTimeRange(40, 90);
929    result = HBaseTestingUtility.getFromStoreFile(store, get);
930    assertTrue(result.size() > 0);
931
932    get.setTimeRange(10, 45);
933    result = HBaseTestingUtility.getFromStoreFile(store, get);
934    assertTrue(result.size() > 0);
935
936    get.setTimeRange(80, 145);
937    result = HBaseTestingUtility.getFromStoreFile(store, get);
938    assertTrue(result.size() > 0);
939
940    get.setTimeRange(1, 2);
941    result = HBaseTestingUtility.getFromStoreFile(store, get);
942    assertTrue(result.size() > 0);
943
944    get.setTimeRange(90, 200);
945    result = HBaseTestingUtility.getFromStoreFile(store, get);
946    assertTrue(result.size() == 0);
947  }
948
949  /**
950   * Test for HBASE-3492 - Test split on empty colfam (no store files).
951   * @throws IOException When the IO operations fail.
952   */
953  @Test
954  public void testSplitWithEmptyColFam() throws IOException {
955    init(this.name.getMethodName());
956    assertFalse(store.getSplitPoint().isPresent());
957  }
958
959  @Test
960  public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
961    final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
962    long anyValue = 10;
963
964    // We'll check that it uses correct config and propagates it appropriately by going thru
965    // the simplest "real" path I can find - "throttleCompaction", which just checks whether
966    // a number we pass in is higher than some config value, inside compactionPolicy.
967    Configuration conf = HBaseConfiguration.create();
968    conf.setLong(CONFIG_KEY, anyValue);
969    init(name.getMethodName() + "-xml", conf);
970    assertTrue(store.throttleCompaction(anyValue + 1));
971    assertFalse(store.throttleCompaction(anyValue));
972
973    // HTD overrides XML.
974    --anyValue;
975    init(
976      name.getMethodName() + "-htd", conf, TableDescriptorBuilder
977        .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)),
978      ColumnFamilyDescriptorBuilder.of(family));
979    assertTrue(store.throttleCompaction(anyValue + 1));
980    assertFalse(store.throttleCompaction(anyValue));
981
982    // HCD overrides them both.
983    --anyValue;
984    init(name.getMethodName() + "-hcd", conf,
985      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY,
986        Long.toString(anyValue)),
987      ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue))
988        .build());
989    assertTrue(store.throttleCompaction(anyValue + 1));
990    assertFalse(store.throttleCompaction(anyValue));
991  }
992
993  public static class DummyStoreEngine extends DefaultStoreEngine {
994    public static DefaultCompactor lastCreatedCompactor = null;
995
996    @Override
997    protected void createComponents(Configuration conf, HStore store, CellComparator comparator)
998      throws IOException {
999      super.createComponents(conf, store, comparator);
1000      lastCreatedCompactor = this.compactor;
1001    }
1002  }
1003
1004  @Test
1005  public void testStoreUsesSearchEngineOverride() throws Exception {
1006    Configuration conf = HBaseConfiguration.create();
1007    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1008    init(this.name.getMethodName(), conf);
1009    assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
1010  }
1011
1012  private void addStoreFile() throws IOException {
1013    HStoreFile f = this.store.getStorefiles().iterator().next();
1014    Path storedir = f.getPath().getParent();
1015    long seqid = this.store.getMaxSequenceId().orElse(0L);
1016    Configuration c = TEST_UTIL.getConfiguration();
1017    FileSystem fs = FileSystem.get(c);
1018    HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1019    StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs)
1020      .withOutputDir(storedir).withFileContext(fileContext).build();
1021    w.appendMetadata(seqid + 1, false);
1022    w.close();
1023    LOG.info("Added store file:" + w.getPath());
1024  }
1025
1026  private void archiveStoreFile(int index) throws IOException {
1027    Collection<HStoreFile> files = this.store.getStorefiles();
1028    HStoreFile sf = null;
1029    Iterator<HStoreFile> it = files.iterator();
1030    for (int i = 0; i <= index; i++) {
1031      sf = it.next();
1032    }
1033    store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(),
1034      Lists.newArrayList(sf));
1035  }
1036
1037  private void closeCompactedFile(int index) throws IOException {
1038    Collection<HStoreFile> files =
1039      this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
1040    if (files.size() > 0) {
1041      HStoreFile sf = null;
1042      Iterator<HStoreFile> it = files.iterator();
1043      for (int i = 0; i <= index; i++) {
1044        sf = it.next();
1045      }
1046      sf.closeStoreFile(true);
1047      store.getStoreEngine().getStoreFileManager()
1048        .removeCompactedFiles(Collections.singletonList(sf));
1049    }
1050  }
1051
1052  @Test
1053  public void testRefreshStoreFiles() throws Exception {
1054    init(name.getMethodName());
1055
1056    assertEquals(0, this.store.getStorefilesCount());
1057
1058    // Test refreshing store files when no store files are there
1059    store.refreshStoreFiles();
1060    assertEquals(0, this.store.getStorefilesCount());
1061
1062    // add some data, flush
1063    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1064    flush(1);
1065    assertEquals(1, this.store.getStorefilesCount());
1066
1067    // add one more file
1068    addStoreFile();
1069
1070    assertEquals(1, this.store.getStorefilesCount());
1071    store.refreshStoreFiles();
1072    assertEquals(2, this.store.getStorefilesCount());
1073
1074    // add three more files
1075    addStoreFile();
1076    addStoreFile();
1077    addStoreFile();
1078
1079    assertEquals(2, this.store.getStorefilesCount());
1080    store.refreshStoreFiles();
1081    assertEquals(5, this.store.getStorefilesCount());
1082
1083    closeCompactedFile(0);
1084    archiveStoreFile(0);
1085
1086    assertEquals(5, this.store.getStorefilesCount());
1087    store.refreshStoreFiles();
1088    assertEquals(4, this.store.getStorefilesCount());
1089
1090    archiveStoreFile(0);
1091    archiveStoreFile(1);
1092    archiveStoreFile(2);
1093
1094    assertEquals(4, this.store.getStorefilesCount());
1095    store.refreshStoreFiles();
1096    assertEquals(1, this.store.getStorefilesCount());
1097
1098    archiveStoreFile(0);
1099    store.refreshStoreFiles();
1100    assertEquals(0, this.store.getStorefilesCount());
1101  }
1102
1103  @Test
1104  public void testRefreshStoreFilesNotChanged() throws IOException {
1105    init(name.getMethodName());
1106
1107    assertEquals(0, this.store.getStorefilesCount());
1108
1109    // add some data, flush
1110    this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null);
1111    flush(1);
1112    // add one more file
1113    addStoreFile();
1114
1115    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
1116
1117    // call first time after files changed
1118    spiedStoreEngine.refreshStoreFiles();
1119    assertEquals(2, this.store.getStorefilesCount());
1120    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1121
1122    // call second time
1123    spiedStoreEngine.refreshStoreFiles();
1124
1125    // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
1126    // refreshed,
1127    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
1128  }
1129
1130  @Test
1131  public void testScanWithCompactionAfterFlush() throws Exception {
1132    TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1133      EverythingPolicy.class.getName());
1134    init(name.getMethodName());
1135
1136    assertEquals(0, this.store.getStorefilesCount());
1137
1138    KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
1139    // add some data, flush
1140    this.store.add(kv, null);
1141    flush(1);
1142    kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
1143    // add some data, flush
1144    this.store.add(kv, null);
1145    flush(2);
1146    kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
1147    // add some data, flush
1148    this.store.add(kv, null);
1149    flush(3);
1150
1151    ExecutorService service = Executors.newFixedThreadPool(2);
1152
1153    Scan scan = new Scan(new Get(row));
1154    Future<KeyValueScanner> scanFuture = service.submit(() -> {
1155      try {
1156        LOG.info(">>>> creating scanner");
1157        return this.store.createScanner(scan,
1158          new ScanInfo(HBaseConfiguration.create(),
1159            ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
1160            Long.MAX_VALUE, 0, CellComparator.getInstance()),
1161          scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
1162      } catch (IOException e) {
1163        e.printStackTrace();
1164        return null;
1165      }
1166    });
1167    Future compactFuture = service.submit(() -> {
1168      try {
1169        LOG.info(">>>>>> starting compaction");
1170        Optional<CompactionContext> opCompaction = this.store.requestCompaction();
1171        assertTrue(opCompaction.isPresent());
1172        store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
1173        LOG.info(">>>>>> Compaction is finished");
1174        this.store.closeAndArchiveCompactedFiles();
1175        LOG.info(">>>>>> Compacted files deleted");
1176      } catch (IOException e) {
1177        e.printStackTrace();
1178      }
1179    });
1180
1181    KeyValueScanner kvs = scanFuture.get();
1182    compactFuture.get();
1183    ((StoreScanner) kvs).currentScanners.forEach(s -> {
1184      if (s instanceof StoreFileScanner) {
1185        assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
1186      }
1187    });
1188    kvs.seek(kv);
1189    service.shutdownNow();
1190  }
1191
1192  private long countMemStoreScanner(StoreScanner scanner) {
1193    if (scanner.currentScanners == null) {
1194      return 0;
1195    }
1196    return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count();
1197  }
1198
1199  @Test
1200  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
1201    long seqId = 100;
1202    long timestamp = EnvironmentEdgeManager.currentTime();
1203    Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1204      .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1205    PrivateCellUtil.setSequenceId(cell0, seqId);
1206    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
1207
1208    Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1209      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1210    PrivateCellUtil.setSequenceId(cell1, seqId);
1211    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
1212
1213    seqId = 101;
1214    timestamp = EnvironmentEdgeManager.currentTime();
1215    Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
1216      .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build();
1217    PrivateCellUtil.setSequenceId(cell2, seqId);
1218    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
1219  }
1220
1221  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
1222    List<Cell> inputCellsAfterSnapshot) throws IOException {
1223    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
1224    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1225    long seqId = Long.MIN_VALUE;
1226    for (Cell c : inputCellsBeforeSnapshot) {
1227      quals.add(CellUtil.cloneQualifier(c));
1228      seqId = Math.max(seqId, c.getSequenceId());
1229    }
1230    for (Cell c : inputCellsAfterSnapshot) {
1231      quals.add(CellUtil.cloneQualifier(c));
1232      seqId = Math.max(seqId, c.getSequenceId());
1233    }
1234    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
1235    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1236    storeFlushCtx.prepare();
1237    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
1238    int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
1239    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
1240      // snapshot + active (if inputCellsAfterSnapshot isn't empty)
1241      assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
1242      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1243      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1244      // snapshot has no data after flush
1245      int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
1246      boolean more;
1247      int cellCount = 0;
1248      do {
1249        List<Cell> cells = new ArrayList<>();
1250        more = s.next(cells);
1251        cellCount += cells.size();
1252        assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
1253      } while (more);
1254      assertEquals(
1255        "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
1256          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
1257        inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
1258      // the current scanners is cleared
1259      assertEquals(0, countMemStoreScanner(s));
1260    }
1261  }
1262
1263  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value)
1264    throws IOException {
1265    return createCell(row, qualifier, ts, sequenceId, value);
1266  }
1267
1268  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
1269    throws IOException {
1270    Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
1271      .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build();
1272    PrivateCellUtil.setSequenceId(c, sequenceId);
1273    return c;
1274  }
1275
1276  @Test
1277  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
1278    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1279    final int expectedSize = 3;
1280    testFlushBeforeCompletingScan(new MyListHook() {
1281      @Override
1282      public void hook(int currentSize) {
1283        if (currentSize == expectedSize - 1) {
1284          try {
1285            flushStore(store, id++);
1286            timeToGoNextRow.set(true);
1287          } catch (IOException e) {
1288            throw new RuntimeException(e);
1289          }
1290        }
1291      }
1292    }, new FilterBase() {
1293      @Override
1294      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1295        return ReturnCode.INCLUDE;
1296      }
1297    }, expectedSize);
1298  }
1299
1300  @Test
1301  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
1302    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
1303    final int expectedSize = 2;
1304    testFlushBeforeCompletingScan(new MyListHook() {
1305      @Override
1306      public void hook(int currentSize) {
1307        if (currentSize == expectedSize - 1) {
1308          try {
1309            flushStore(store, id++);
1310            timeToGoNextRow.set(true);
1311          } catch (IOException e) {
1312            throw new RuntimeException(e);
1313          }
1314        }
1315      }
1316    }, new FilterBase() {
1317      @Override
1318      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1319        if (timeToGoNextRow.get()) {
1320          timeToGoNextRow.set(false);
1321          return ReturnCode.NEXT_ROW;
1322        } else {
1323          return ReturnCode.INCLUDE;
1324        }
1325      }
1326    }, expectedSize);
1327  }
1328
1329  @Test
1330  public void testFlushBeforeCompletingScanWithFilterHint()
1331    throws IOException, InterruptedException {
1332    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
1333    final int expectedSize = 2;
1334    testFlushBeforeCompletingScan(new MyListHook() {
1335      @Override
1336      public void hook(int currentSize) {
1337        if (currentSize == expectedSize - 1) {
1338          try {
1339            flushStore(store, id++);
1340            timeToGetHint.set(true);
1341          } catch (IOException e) {
1342            throw new RuntimeException(e);
1343          }
1344        }
1345      }
1346    }, new FilterBase() {
1347      @Override
1348      public Filter.ReturnCode filterCell(final Cell c) throws IOException {
1349        if (timeToGetHint.get()) {
1350          timeToGetHint.set(false);
1351          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
1352        } else {
1353          return Filter.ReturnCode.INCLUDE;
1354        }
1355      }
1356
1357      @Override
1358      public Cell getNextCellHint(Cell currentCell) throws IOException {
1359        return currentCell;
1360      }
1361    }, expectedSize);
1362  }
1363
1364  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
1365    throws IOException, InterruptedException {
1366    Configuration conf = HBaseConfiguration.create();
1367    byte[] r0 = Bytes.toBytes("row0");
1368    byte[] r1 = Bytes.toBytes("row1");
1369    byte[] r2 = Bytes.toBytes("row2");
1370    byte[] value0 = Bytes.toBytes("value0");
1371    byte[] value1 = Bytes.toBytes("value1");
1372    byte[] value2 = Bytes.toBytes("value2");
1373    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1374    long ts = EnvironmentEdgeManager.currentTime();
1375    long seqId = 100;
1376    init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1377      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
1378      new MyStoreHook() {
1379        @Override
1380        public long getSmallestReadPoint(HStore store) {
1381          return seqId + 3;
1382        }
1383      });
1384    // The cells having the value0 won't be flushed to disk because the value of max version is 1
1385    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1386    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1387    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1388    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
1389    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
1390    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
1391    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
1392    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
1393    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
1394    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
1395    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
1396    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
1397    List<Cell> myList = new MyList<>(hook);
1398    Scan scan = new Scan().withStartRow(r1).setFilter(filter);
1399    try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) {
1400      // r1
1401      scanner.next(myList);
1402      assertEquals(expectedSize, myList.size());
1403      for (Cell c : myList) {
1404        byte[] actualValue = CellUtil.cloneValue(c);
1405        assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:"
1406          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1407      }
1408      List<Cell> normalList = new ArrayList<>(3);
1409      // r2
1410      scanner.next(normalList);
1411      assertEquals(3, normalList.size());
1412      for (Cell c : normalList) {
1413        byte[] actualValue = CellUtil.cloneValue(c);
1414        assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:"
1415          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2));
1416      }
1417    }
1418  }
1419
1420  @Test
1421  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
1422    Configuration conf = HBaseConfiguration.create();
1423    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
1424    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1425      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1426    byte[] value = Bytes.toBytes("value");
1427    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1428    long ts = EnvironmentEdgeManager.currentTime();
1429    long seqId = 100;
1430    // older data whihc shouldn't be "seen" by client
1431    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1432    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1433    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1434    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1435    quals.add(qf1);
1436    quals.add(qf2);
1437    quals.add(qf3);
1438    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1439    MyCompactingMemStore.START_TEST.set(true);
1440    Runnable flush = () -> {
1441      // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
1442      // recreate the active memstore -- phase (4/5)
1443      storeFlushCtx.prepare();
1444    };
1445    ExecutorService service = Executors.newSingleThreadExecutor();
1446    service.execute(flush);
1447    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
1448    // this is blocked until we recreate the active memstore -- phase (3/5)
1449    // we get scanner from active memstore but it is empty -- phase (5/5)
1450    InternalScanner scanner =
1451      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
1452    service.shutdown();
1453    service.awaitTermination(20, TimeUnit.SECONDS);
1454    try {
1455      try {
1456        List<Cell> results = new ArrayList<>();
1457        scanner.next(results);
1458        assertEquals(3, results.size());
1459        for (Cell c : results) {
1460          byte[] actualValue = CellUtil.cloneValue(c);
1461          assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1462            + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1463        }
1464      } finally {
1465        scanner.close();
1466      }
1467    } finally {
1468      MyCompactingMemStore.START_TEST.set(false);
1469      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1470      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1471    }
1472  }
1473
1474  @Test
1475  public void testScanWithDoubleFlush() throws IOException {
1476    Configuration conf = HBaseConfiguration.create();
1477    // Initialize region
1478    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1479      @Override
1480      public void getScanners(MyStore store) throws IOException {
1481        final long tmpId = id++;
1482        ExecutorService s = Executors.newSingleThreadExecutor();
1483        s.execute(() -> {
1484          try {
1485            // flush the store before storescanner updates the scanners from store.
1486            // The current data will be flushed into files, and the memstore will
1487            // be clear.
1488            // -- phase (4/4)
1489            flushStore(store, tmpId);
1490          } catch (IOException ex) {
1491            throw new RuntimeException(ex);
1492          }
1493        });
1494        s.shutdown();
1495        try {
1496          // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
1497          s.awaitTermination(3, TimeUnit.SECONDS);
1498        } catch (InterruptedException ex) {
1499        }
1500      }
1501    });
1502    byte[] oldValue = Bytes.toBytes("oldValue");
1503    byte[] currentValue = Bytes.toBytes("currentValue");
1504    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1505    long ts = EnvironmentEdgeManager.currentTime();
1506    long seqId = 100;
1507    // older data whihc shouldn't be "seen" by client
1508    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
1509    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
1510    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
1511    long snapshotId = id++;
1512    // push older data into snapshot -- phase (1/4)
1513    StoreFlushContext storeFlushCtx =
1514      store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
1515    storeFlushCtx.prepare();
1516
1517    // insert current data into active -- phase (2/4)
1518    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
1519    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
1520    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
1521    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1522    quals.add(qf1);
1523    quals.add(qf2);
1524    quals.add(qf3);
1525    try (InternalScanner scanner =
1526      (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) {
1527      // complete the flush -- phase (3/4)
1528      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1529      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1530
1531      List<Cell> results = new ArrayList<>();
1532      scanner.next(results);
1533      assertEquals(3, results.size());
1534      for (Cell c : results) {
1535        byte[] actualValue = CellUtil.cloneValue(c);
1536        assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:"
1537          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue));
1538      }
1539    }
1540  }
1541
1542  /**
1543   * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1544   * Compaction execute concurrently and theCcompaction compact and archive the flushed
1545   * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1546   * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1547   */
1548  @Test
1549  public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1550    Configuration conf = HBaseConfiguration.create();
1551    conf.setBoolean(WALFactory.WAL_ENABLED, false);
1552    conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1553    byte[] r0 = Bytes.toBytes("row0");
1554    byte[] r1 = Bytes.toBytes("row1");
1555    final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1556    final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1557    // Initialize region
1558    final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1559      @Override
1560      public void getScanners(MyStore store) throws IOException {
1561        try {
1562          // Here this method is called by StoreScanner.updateReaders which is invoked by the
1563          // following TestHStore.flushStore
1564          if (shouldWaitRef.get()) {
1565            // wait the following compaction Task start
1566            cyclicBarrier.await();
1567            // wait the following HStore.closeAndArchiveCompactedFiles end.
1568            cyclicBarrier.await();
1569          }
1570        } catch (BrokenBarrierException | InterruptedException e) {
1571          throw new RuntimeException(e);
1572        }
1573      }
1574    });
1575
1576    final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1577    Runnable compactionTask = () -> {
1578      try {
1579        // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1580        // entering the MyStore.getScanners, compactionTask could start.
1581        cyclicBarrier.await();
1582        region.compactStore(family, new NoLimitThroughputController());
1583        myStore.closeAndArchiveCompactedFiles();
1584        // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1585        cyclicBarrier.await();
1586      } catch (Throwable e) {
1587        compactionExceptionRef.set(e);
1588      }
1589    };
1590
1591    long ts = EnvironmentEdgeManager.currentTime();
1592    long seqId = 100;
1593    byte[] value = Bytes.toBytes("value");
1594    // older data whihc shouldn't be "seen" by client
1595    myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1596    flushStore(myStore, id++);
1597    myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1598    flushStore(myStore, id++);
1599    myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1600    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1601    quals.add(qf1);
1602    quals.add(qf2);
1603    quals.add(qf3);
1604
1605    myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1606    myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1607    myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1608
1609    Thread.currentThread()
1610      .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1611    Scan scan = new Scan();
1612    scan.withStartRow(r0, true);
1613    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1614      List<Cell> results = new MyList<>(size -> {
1615        switch (size) {
1616          case 1:
1617            shouldWaitRef.set(true);
1618            Thread thread = new Thread(compactionTask);
1619            thread.setName("MyCompacting Thread.");
1620            thread.start();
1621            try {
1622              flushStore(myStore, id++);
1623              thread.join();
1624            } catch (IOException | InterruptedException e) {
1625              throw new RuntimeException(e);
1626            }
1627            shouldWaitRef.set(false);
1628            break;
1629          default:
1630            break;
1631        }
1632      });
1633      // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1634      // which used by StoreScanner.updateReaders is deleted by compactionTask.
1635      scanner.next(results);
1636      // The results is r0 row cells.
1637      assertEquals(3, results.size());
1638      assertTrue(compactionExceptionRef.get() == null);
1639    }
1640  }
1641
1642  @Test
1643  public void testReclaimChunkWhenScaning() throws IOException {
1644    init("testReclaimChunkWhenScaning");
1645    long ts = EnvironmentEdgeManager.currentTime();
1646    long seqId = 100;
1647    byte[] value = Bytes.toBytes("value");
1648    // older data whihc shouldn't be "seen" by client
1649    store.add(createCell(qf1, ts, seqId, value), null);
1650    store.add(createCell(qf2, ts, seqId, value), null);
1651    store.add(createCell(qf3, ts, seqId, value), null);
1652    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1653    quals.add(qf1);
1654    quals.add(qf2);
1655    quals.add(qf3);
1656    try (InternalScanner scanner =
1657      (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) {
1658      List<Cell> results = new MyList<>(size -> {
1659        switch (size) {
1660          // 1) we get the first cell (qf1)
1661          // 2) flush the data to have StoreScanner update inner scanners
1662          // 3) the chunk will be reclaimed after updaing
1663          case 1:
1664            try {
1665              flushStore(store, id++);
1666            } catch (IOException e) {
1667              throw new RuntimeException(e);
1668            }
1669            break;
1670          // 1) we get the second cell (qf2)
1671          // 2) add some cell to fill some byte into the chunk (we have only one chunk)
1672          case 2:
1673            try {
1674              byte[] newValue = Bytes.toBytes("newValue");
1675              // older data whihc shouldn't be "seen" by client
1676              store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
1677              store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
1678              store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
1679            } catch (IOException e) {
1680              throw new RuntimeException(e);
1681            }
1682            break;
1683          default:
1684            break;
1685        }
1686      });
1687      scanner.next(results);
1688      assertEquals(3, results.size());
1689      for (Cell c : results) {
1690        byte[] actualValue = CellUtil.cloneValue(c);
1691        assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:"
1692          + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value));
1693      }
1694    }
1695  }
1696
1697  /**
1698   * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
1699   * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
1700   * the corresponding segments. In short, there will be some segements which isn't in merge are
1701   * removed.
1702   */
1703  @Test
1704  public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
1705    int flushSize = 500;
1706    Configuration conf = HBaseConfiguration.create();
1707    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
1708    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25);
1709    MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0);
1710    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
1711    // Set the lower threshold to invoke the "MERGE" policy
1712    conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
1713    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
1714      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
1715    byte[] value = Bytes.toBytes("thisisavarylargevalue");
1716    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1717    long ts = EnvironmentEdgeManager.currentTime();
1718    long seqId = 100;
1719    // older data whihc shouldn't be "seen" by client
1720    store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
1721    store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
1722    store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
1723    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1724    StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
1725    storeFlushCtx.prepare();
1726    // This shouldn't invoke another in-memory flush because the first compactor thread
1727    // hasn't accomplished the in-memory compaction.
1728    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1729    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1730    store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
1731    assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1732    // okay. Let the compaction be completed
1733    MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
1734    CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore;
1735    while (mem.isMemStoreFlushingInMemory()) {
1736      TimeUnit.SECONDS.sleep(1);
1737    }
1738    // This should invoke another in-memory flush.
1739    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1740    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1741    store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
1742    assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
1743    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
1744      String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
1745    storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
1746    storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
1747  }
1748
1749  @Test
1750  public void testAge() throws IOException {
1751    long currentTime = EnvironmentEdgeManager.currentTime();
1752    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
1753    edge.setValue(currentTime);
1754    EnvironmentEdgeManager.injectEdge(edge);
1755    Configuration conf = TEST_UTIL.getConfiguration();
1756    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
1757    initHRegion(name.getMethodName(), conf,
1758      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
1759    HStore store = new HStore(region, hcd, conf, false) {
1760
1761      @Override
1762      protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
1763        CellComparator kvComparator) throws IOException {
1764        List<HStoreFile> storefiles =
1765          Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100),
1766            mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000));
1767        StoreFileManager sfm = mock(StoreFileManager.class);
1768        when(sfm.getStorefiles()).thenReturn(storefiles);
1769        StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class);
1770        when(storeEngine.getStoreFileManager()).thenReturn(sfm);
1771        return storeEngine;
1772      }
1773    };
1774    assertEquals(10L, store.getMinStoreFileAge().getAsLong());
1775    assertEquals(10000L, store.getMaxStoreFileAge().getAsLong());
1776    assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4);
1777  }
1778
1779  private HStoreFile mockStoreFile(long createdTime) {
1780    StoreFileInfo info = mock(StoreFileInfo.class);
1781    when(info.getCreatedTimestamp()).thenReturn(createdTime);
1782    HStoreFile sf = mock(HStoreFile.class);
1783    when(sf.getReader()).thenReturn(mock(StoreFileReader.class));
1784    when(sf.isHFile()).thenReturn(true);
1785    when(sf.getFileInfo()).thenReturn(info);
1786    return sf;
1787  }
1788
1789  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook)
1790    throws IOException {
1791    return (MyStore) init(methodName, conf,
1792      TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1793      ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook);
1794  }
1795
1796  private static class MyStore extends HStore {
1797    private final MyStoreHook hook;
1798
1799    MyStore(final HRegion region, final ColumnFamilyDescriptor family,
1800      final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
1801      super(region, family, confParam, false);
1802      this.hook = hook;
1803    }
1804
1805    @Override
1806    public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,
1807      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1808      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
1809      boolean includeMemstoreScanner) throws IOException {
1810      hook.getScanners(this);
1811      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
1812        stopRow, false, readPt, includeMemstoreScanner);
1813    }
1814
1815    @Override
1816    public long getSmallestReadPoint() {
1817      return hook.getSmallestReadPoint(this);
1818    }
1819  }
1820
1821  private abstract static class MyStoreHook {
1822
1823    void getScanners(MyStore store) throws IOException {
1824    }
1825
1826    long getSmallestReadPoint(HStore store) {
1827      return store.getHRegion().getSmallestReadPoint();
1828    }
1829  }
1830
1831  @Test
1832  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
1833    Configuration conf = HBaseConfiguration.create();
1834    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1835    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
1836    // Set the lower threshold to invoke the "MERGE" policy
1837    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1838    });
1839    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1840    long ts = EnvironmentEdgeManager.currentTime();
1841    long seqID = 1L;
1842    // Add some data to the region and do some flushes
1843    for (int i = 1; i < 10; i++) {
1844      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1845        memStoreSizing);
1846    }
1847    // flush them
1848    flushStore(store, seqID);
1849    for (int i = 11; i < 20; i++) {
1850      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1851        memStoreSizing);
1852    }
1853    // flush them
1854    flushStore(store, seqID);
1855    for (int i = 21; i < 30; i++) {
1856      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1857        memStoreSizing);
1858    }
1859    // flush them
1860    flushStore(store, seqID);
1861
1862    assertEquals(3, store.getStorefilesCount());
1863    Scan scan = new Scan();
1864    scan.addFamily(family);
1865    Collection<HStoreFile> storefiles2 = store.getStorefiles();
1866    ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
1867    StoreScanner storeScanner =
1868      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1869    // get the current heap
1870    KeyValueHeap heap = storeScanner.heap;
1871    // create more store files
1872    for (int i = 31; i < 40; i++) {
1873      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1874        memStoreSizing);
1875    }
1876    // flush them
1877    flushStore(store, seqID);
1878
1879    for (int i = 41; i < 50; i++) {
1880      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
1881        memStoreSizing);
1882    }
1883    // flush them
1884    flushStore(store, seqID);
1885    storefiles2 = store.getStorefiles();
1886    ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
1887    actualStorefiles1.removeAll(actualStorefiles);
1888    // Do compaction
1889    MyThread thread = new MyThread(storeScanner);
1890    thread.start();
1891    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
1892    thread.join();
1893    KeyValueHeap heap2 = thread.getHeap();
1894    assertFalse(heap.equals(heap2));
1895  }
1896
1897  @Test
1898  public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception {
1899    Configuration conf = HBaseConfiguration.create();
1900    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
1901    // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type.
1902    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1);
1903    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1904    });
1905    Scan scan = new Scan();
1906    scan.addFamily(family);
1907    // ReadType on Scan is still DEFAULT only.
1908    assertEquals(ReadType.DEFAULT, scan.getReadType());
1909    StoreScanner storeScanner =
1910      (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
1911    assertFalse(storeScanner.isScanUsePread());
1912  }
1913
1914  @Test
1915  public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException {
1916    Configuration conf = HBaseConfiguration.create();
1917    conf.set("hbase.systemtables.compacting.memstore.type", "eager");
1918    init(name.getMethodName(), conf,
1919      TableDescriptorBuilder.newBuilder(
1920        TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())),
1921      ColumnFamilyDescriptorBuilder.newBuilder(family)
1922        .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build());
1923    assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString()
1924      .startsWith("eager".toUpperCase()));
1925  }
1926
1927  @Test
1928  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
1929    final TableName tn = TableName.valueOf(name.getMethodName());
1930    init(name.getMethodName());
1931
1932    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
1933
1934    HStoreFile sf1 = mockStoreFileWithLength(1024L);
1935    HStoreFile sf2 = mockStoreFileWithLength(2048L);
1936    HStoreFile sf3 = mockStoreFileWithLength(4096L);
1937    HStoreFile sf4 = mockStoreFileWithLength(8192L);
1938
1939    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
1940      .setEndKey(Bytes.toBytes("b")).build();
1941
1942    // Compacting two files down to one, reducing size
1943    sizeStore.put(regionInfo, 1024L + 4096L);
1944    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3),
1945      Arrays.asList(sf2));
1946
1947    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1948
1949    // The same file length in and out should have no change
1950    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1951      Arrays.asList(sf2));
1952
1953    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
1954
1955    // Increase the total size used
1956    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2),
1957      Arrays.asList(sf3));
1958
1959    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
1960
1961    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
1962      .setEndKey(Bytes.toBytes("c")).build();
1963    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
1964
1965    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
1966  }
1967
1968  @Test
1969  public void testHFileContextSetWithCFAndTable() throws Exception {
1970    init(this.name.getMethodName());
1971    StoreFileWriter writer = store.getStoreEngine()
1972      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
1973        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
1974        .includesTag(false).shouldDropBehind(true));
1975    HFileContext hFileContext = writer.getHFileWriter().getFileContext();
1976    assertArrayEquals(family, hFileContext.getColumnFamily());
1977    assertArrayEquals(table, hFileContext.getTableName());
1978  }
1979
1980  // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell
1981  // but its dataSize exceeds inmemoryFlushSize
1982  @Test
1983  public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize()
1984    throws IOException, InterruptedException {
1985    Configuration conf = HBaseConfiguration.create();
1986
1987    byte[] smallValue = new byte[3];
1988    byte[] largeValue = new byte[9];
1989    final long timestamp = EnvironmentEdgeManager.currentTime();
1990    final long seqId = 100;
1991    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
1992    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
1993    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
1994    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
1995    int flushByteSize = smallCellByteSize + largeCellByteSize - 2;
1996
1997    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
1998    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName());
1999    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2000    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2001
2002    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2003      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2004
2005    MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore);
2006    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2007    myCompactingMemStore.smallCellPreUpdateCounter.set(0);
2008    myCompactingMemStore.largeCellPreUpdateCounter.set(0);
2009
2010    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2011    Thread smallCellThread = new Thread(() -> {
2012      try {
2013        store.add(smallCell, new NonThreadSafeMemStoreSizing());
2014      } catch (Throwable exception) {
2015        exceptionRef.set(exception);
2016      }
2017    });
2018    smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME);
2019    smallCellThread.start();
2020
2021    String oldThreadName = Thread.currentThread().getName();
2022    try {
2023      /**
2024       * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
2025       * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread
2026       * invokes flushInMemory.
2027       * <p/>
2028       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2029       * can add cell to currentActive . That is to say when largeCellThread called flushInMemory
2030       * method, CompactingMemStore.active has no cell.
2031       */
2032      Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME);
2033      store.add(largeCell, new NonThreadSafeMemStoreSizing());
2034      smallCellThread.join();
2035
2036      for (int i = 0; i < 100; i++) {
2037        long currentTimestamp = timestamp + 100 + i;
2038        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2039        store.add(cell, new NonThreadSafeMemStoreSizing());
2040      }
2041    } finally {
2042      Thread.currentThread().setName(oldThreadName);
2043    }
2044
2045    assertTrue(exceptionRef.get() == null);
2046
2047  }
2048
2049  // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds
2050  // InmemoryFlushSize
2051  @Test(timeout = 60000)
2052  public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception {
2053    Configuration conf = HBaseConfiguration.create();
2054    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2055
2056    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2057      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2058
2059    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2060
2061    int size = (int) (myCompactingMemStore.getInmemoryFlushSize());
2062    byte[] value = new byte[size + 1];
2063
2064    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2065    long timestamp = EnvironmentEdgeManager.currentTime();
2066    long seqId = 100;
2067    Cell cell = createCell(qf1, timestamp, seqId, value);
2068    int cellByteSize = MutableSegment.getCellLength(cell);
2069    store.add(cell, memStoreSizing);
2070    assertTrue(memStoreSizing.getCellsCount() == 1);
2071    assertTrue(memStoreSizing.getDataSize() == cellByteSize);
2072    // Waiting the in memory compaction completed, see HBASE-26438
2073    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2074  }
2075
2076  /**
2077   * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
2078   * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
2079   * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
2080   * segment replace, and we may read wrong data when these chunk reused by others.
2081   */
2082  @Test
2083  public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
2084    Configuration conf = HBaseConfiguration.create();
2085    int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
2086
2087    // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
2088    byte[] cellValue = new byte[maxAllocByteSize + 1];
2089    final long timestamp = EnvironmentEdgeManager.currentTime();
2090    final long seqId = 100;
2091    final byte[] rowKey1 = Bytes.toBytes("rowKey1");
2092    final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
2093    final byte[] rowKey2 = Bytes.toBytes("rowKey2");
2094    final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
2095    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2096    quals.add(qf1);
2097
2098    int cellByteSize = MutableSegment.getCellLength(originalCell1);
2099    int inMemoryFlushByteSize = cellByteSize - 1;
2100
2101    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2102    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
2103    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2104    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
2105    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2106
2107    // Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
2108    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2109      .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
2110
2111    MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
2112    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
2113
2114    // Data chunk Pool is disabled.
2115    assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
2116
2117    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2118
2119    // First compact
2120    store.add(originalCell1, memStoreSizing);
2121    // Waiting for the first in-memory compaction finished
2122    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2123
2124    StoreScanner storeScanner =
2125      (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2126    SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2127    Cell resultCell1 = segmentScanner.next();
2128    assertTrue(CellUtil.equals(resultCell1, originalCell1));
2129    int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId();
2130    assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
2131    assertNull(segmentScanner.next());
2132    segmentScanner.close();
2133    storeScanner.close();
2134    Segment segment = segmentScanner.segment;
2135    assertTrue(segment instanceof CellChunkImmutableSegment);
2136    MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2137    assertTrue(!memStoreLAB1.isClosed());
2138    assertTrue(!memStoreLAB1.chunks.isEmpty());
2139    assertTrue(!memStoreLAB1.isReclaimed());
2140
2141    // Second compact
2142    store.add(originalCell2, memStoreSizing);
2143    // Waiting for the second in-memory compaction finished
2144    myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2145
2146    // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
2147    // must be associated with chunk.. We were looking for a cell at index 0.
2148    // The cause for this exception is because the data chunk Pool is disabled,when the data chunks
2149    // are recycled after the second in-memory compaction finished,the
2150    // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
2151    // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
2152    // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
2153    storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
2154    segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2155    Cell newResultCell1 = segmentScanner.next();
2156    assertTrue(newResultCell1 != resultCell1);
2157    assertTrue(CellUtil.equals(newResultCell1, originalCell1));
2158
2159    Cell resultCell2 = segmentScanner.next();
2160    assertTrue(CellUtil.equals(resultCell2, originalCell2));
2161    assertNull(segmentScanner.next());
2162    segmentScanner.close();
2163    storeScanner.close();
2164
2165    segment = segmentScanner.segment;
2166    assertTrue(segment instanceof CellChunkImmutableSegment);
2167    MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2168    assertTrue(!memStoreLAB2.isClosed());
2169    assertTrue(!memStoreLAB2.chunks.isEmpty());
2170    assertTrue(!memStoreLAB2.isReclaimed());
2171    assertTrue(memStoreLAB1.isClosed());
2172    assertTrue(memStoreLAB1.chunks.isEmpty());
2173    assertTrue(memStoreLAB1.isReclaimed());
2174  }
2175
2176  // This test is for HBASE-26210 also, test write large cell and small cell concurrently when
2177  // InmemoryFlushSize is smaller,equal with and larger than cell size.
2178  @Test
2179  public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently()
2180    throws IOException, InterruptedException {
2181    doWriteTestLargeCellAndSmallCellConcurrently(
2182      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1);
2183    doWriteTestLargeCellAndSmallCellConcurrently(
2184      (smallCellByteSize, largeCellByteSize) -> largeCellByteSize);
2185    doWriteTestLargeCellAndSmallCellConcurrently(
2186      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1);
2187    doWriteTestLargeCellAndSmallCellConcurrently(
2188      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize);
2189    doWriteTestLargeCellAndSmallCellConcurrently(
2190      (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1);
2191  }
2192
2193  private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize)
2194    throws IOException, InterruptedException {
2195
2196    Configuration conf = HBaseConfiguration.create();
2197
2198    byte[] smallValue = new byte[3];
2199    byte[] largeValue = new byte[100];
2200    final long timestamp = EnvironmentEdgeManager.currentTime();
2201    final long seqId = 100;
2202    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2203    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2204    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2205    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2206    int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize);
2207    boolean flushByteSizeLessThanSmallAndLargeCellSize =
2208      flushByteSize < (smallCellByteSize + largeCellByteSize);
2209
2210    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName());
2211    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2212    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2213
2214    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2215      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2216
2217    MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore);
2218    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2219    myCompactingMemStore.disableCompaction();
2220    if (flushByteSizeLessThanSmallAndLargeCellSize) {
2221      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true;
2222    } else {
2223      myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false;
2224    }
2225
2226    final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
2227    final AtomicLong totalCellByteSize = new AtomicLong(0);
2228    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2229    Thread smallCellThread = new Thread(() -> {
2230      try {
2231        for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2232          long currentTimestamp = timestamp + i;
2233          Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue);
2234          totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2235          store.add(cell, memStoreSizing);
2236        }
2237      } catch (Throwable exception) {
2238        exceptionRef.set(exception);
2239
2240      }
2241    });
2242    smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME);
2243    smallCellThread.start();
2244
2245    String oldThreadName = Thread.currentThread().getName();
2246    try {
2247      /**
2248       * When flushByteSizeLessThanSmallAndLargeCellSize is true:
2249       * </p>
2250       * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then
2251       * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then
2252       * largeCellThread invokes flushInMemory.
2253       * <p/>
2254       * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread
2255       * can run into MyCompactingMemStore3.checkAndAddToActiveSize again.
2256       * <p/>
2257       * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and
2258       * largeCellThread concurrently write one cell and wait each other, and then write another
2259       * cell etc.
2260       */
2261      Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME);
2262      for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) {
2263        long currentTimestamp = timestamp + i;
2264        Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue);
2265        totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell));
2266        store.add(cell, memStoreSizing);
2267      }
2268      smallCellThread.join();
2269
2270      assertTrue(exceptionRef.get() == null);
2271      assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2));
2272      assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get());
2273      if (flushByteSizeLessThanSmallAndLargeCellSize) {
2274        assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT);
2275      } else {
2276        assertTrue(
2277          myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1));
2278      }
2279    } finally {
2280      Thread.currentThread().setName(oldThreadName);
2281    }
2282  }
2283
2284  /**
2285   * <pre>
2286    * This test is for HBASE-26384,
2287   * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()}
2288   * execute concurrently.
2289   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2290   * for both branch-2 and master):
2291   * 1. The {@link CompactingMemStore} size exceeds
2292   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2293   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2294   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2295   * 2. The in memory compact thread starts and then stopping before
2296   *    {@link CompactingMemStore#flattenOneSegment}.
2297   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2298   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2299   *    compact thread continues.
2300   *    Assuming {@link VersionedSegmentsList#version} returned from
2301   *    {@link CompactingMemStore#getImmutableSegments} is v.
2302   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2303   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2304   *    {@link CompactionPipeline#version} is still v.
2305   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2306   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2307   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2308   *    {@link CompactionPipeline} has changed because
2309   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2310   *    removed in fact and still remaining in {@link CompactionPipeline}.
2311   *
2312   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior:
2313   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2314   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2315   *    v+1.
2316   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2317   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2318   *    failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once
2319   *    again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now,
2320   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.
2321   * </pre>
2322   */
2323  @Test
2324  public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception {
2325    Configuration conf = HBaseConfiguration.create();
2326
2327    byte[] smallValue = new byte[3];
2328    byte[] largeValue = new byte[9];
2329    final long timestamp = EnvironmentEdgeManager.currentTime();
2330    final long seqId = 100;
2331    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2332    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2333    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2334    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2335    int totalCellByteSize = (smallCellByteSize + largeCellByteSize);
2336    int flushByteSize = totalCellByteSize - 2;
2337
2338    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2339    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName());
2340    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2341    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2342
2343    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2344      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2345
2346    MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore);
2347    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2348
2349    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2350    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2351
2352    String oldThreadName = Thread.currentThread().getName();
2353    try {
2354      Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME);
2355      /**
2356       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2357       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2358       * would invoke {@link CompactingMemStore#stopCompaction}.
2359       */
2360      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2361
2362      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2363      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2364
2365      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2366      assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize);
2367      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2368      assertTrue(segments.getNumOfSegments() == 0);
2369      assertTrue(segments.getNumOfCells() == 0);
2370      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1);
2371      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2372    } finally {
2373      Thread.currentThread().setName(oldThreadName);
2374    }
2375  }
2376
2377  /**
2378   * <pre>
2379   * This test is for HBASE-26384,
2380   * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()}
2381   * and writeMemStore execute concurrently.
2382   * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs
2383   * for both branch-2 and master):
2384   * 1. The {@link CompactingMemStore} size exceeds
2385   *    {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new
2386   *    {@link ImmutableSegment}  to the head of {@link CompactingMemStore#pipeline},and start a
2387   *    in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}.
2388   * 2. The in memory compact thread starts and then stopping before
2389   *    {@link CompactingMemStore#flattenOneSegment}.
2390   * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the
2391   *    snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory
2392   *    compact thread continues.
2393   *    Assuming {@link VersionedSegmentsList#version} returned from
2394   *    {@link CompactingMemStore#getImmutableSegments} is v.
2395   * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}.
2396   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2397   *    {@link CompactionPipeline#version} is still v.
2398   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2399   *    {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull}
2400   *    thinks it is successful and continue flushing,but the {@link ImmutableSegment} in
2401   *    {@link CompactionPipeline} has changed because
2402   *    {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not
2403   *    removed in fact and still remaining in {@link CompactionPipeline}.
2404   *
2405   * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior,
2406   * and I add step 7-8 to test there is new segment added before retry.
2407   * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment},
2408   *    {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to
2409   *     v+1.
2410   * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2411   *    {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull}
2412   *    failed and retry,{@link VersionedSegmentsList#version} returned from
2413   *    {@link CompactingMemStore#getImmutableSegments} is v+1.
2414   * 7. The write thread continues writing to {@link CompactingMemStore} and
2415   *    {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()},
2416   *    {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new
2417   *    {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline},
2418   *    {@link CompactionPipeline#version} is still v+1.
2419   * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because
2420   *    {@link CompactionPipeline#version} is still v+1,
2421   *    {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment}
2422   *    remained at the head of {@link CompactingMemStore#pipeline},the old is removed by
2423   *    {@link CompactingMemStore#swapPipelineWithNull}.
2424   * </pre>
2425   */
2426  @Test
2427  public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception {
2428    Configuration conf = HBaseConfiguration.create();
2429
2430    byte[] smallValue = new byte[3];
2431    byte[] largeValue = new byte[9];
2432    final long timestamp = EnvironmentEdgeManager.currentTime();
2433    final long seqId = 100;
2434    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2435    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2436    int smallCellByteSize = MutableSegment.getCellLength(smallCell);
2437    int largeCellByteSize = MutableSegment.getCellLength(largeCell);
2438    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2439    int flushByteSize = firstWriteCellByteSize - 2;
2440
2441    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2442    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName());
2443    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2444    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2445
2446    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2447      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2448
2449    final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore);
2450    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2451
2452    store.add(smallCell, new NonThreadSafeMemStoreSizing());
2453    store.add(largeCell, new NonThreadSafeMemStoreSizing());
2454
2455    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2456    final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue);
2457    final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2458    final int writeAgainCellByteSize =
2459      MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2);
2460    final Thread writeAgainThread = new Thread(() -> {
2461      try {
2462        myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await();
2463
2464        store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing());
2465        store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing());
2466
2467        myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await();
2468      } catch (Throwable exception) {
2469        exceptionRef.set(exception);
2470      }
2471    });
2472    writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME);
2473    writeAgainThread.start();
2474
2475    String oldThreadName = Thread.currentThread().getName();
2476    try {
2477      Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME);
2478      /**
2479       * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters
2480       * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot}
2481       * would invoke {@link CompactingMemStore#stopCompaction}.
2482       */
2483      myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await();
2484      MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot();
2485      myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
2486      writeAgainThread.join();
2487
2488      assertTrue(memStoreSnapshot.getCellsCount() == 2);
2489      assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize);
2490      VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments();
2491      assertTrue(segments.getNumOfSegments() == 1);
2492      assertTrue(
2493        ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize);
2494      assertTrue(segments.getNumOfCells() == 2);
2495      assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2);
2496      assertTrue(exceptionRef.get() == null);
2497      assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2);
2498    } finally {
2499      Thread.currentThread().setName(oldThreadName);
2500    }
2501  }
2502
2503  /**
2504   * <pre>
2505   * This test is for HBASE-26465,
2506   * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute
2507   * concurrently. The threads sequence before HBASE-26465 is:
2508   * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to
2509   *  {@link DefaultMemStore}.
2510   * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in
2511   *   {@link HStore#updateStorefiles} after completed flushing memStore to hfile.
2512   * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in
2513   *   {@link DefaultMemStore#getScanners},here the scan thread gets the
2514   *   {@link DefaultMemStore#snapshot} which is created by the flush thread.
2515   * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close
2516   *   {@link DefaultMemStore#snapshot},because the reference count of the corresponding
2517   *   {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl}
2518   *   are recycled.
2519   * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a
2520   *   {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the
2521   *   reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in
2522   *   corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may
2523   *   be overwritten by other write threads,which may cause serious problem.
2524   * After HBASE-26465,{@link DefaultMemStore#getScanners} and
2525   * {@link DefaultMemStore#clearSnapshot} could not execute concurrently.
2526   * </pre>
2527   */
2528  @Test
2529  public void testClearSnapshotGetScannerConcurrently() throws Exception {
2530    Configuration conf = HBaseConfiguration.create();
2531
2532    byte[] smallValue = new byte[3];
2533    byte[] largeValue = new byte[9];
2534    final long timestamp = EnvironmentEdgeManager.currentTime();
2535    final long seqId = 100;
2536    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2537    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2538    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2539    quals.add(qf1);
2540    quals.add(qf2);
2541
2542    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName());
2543    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2544
2545    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2546    MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore);
2547    myDefaultMemStore.store = store;
2548
2549    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2550    store.add(smallCell, memStoreSizing);
2551    store.add(largeCell, memStoreSizing);
2552
2553    final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
2554    final Thread flushThread = new Thread(() -> {
2555      try {
2556        flushStore(store, id++);
2557      } catch (Throwable exception) {
2558        exceptionRef.set(exception);
2559      }
2560    });
2561    flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME);
2562    flushThread.start();
2563
2564    String oldThreadName = Thread.currentThread().getName();
2565    StoreScanner storeScanner = null;
2566    try {
2567      Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME);
2568
2569      /**
2570       * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot}
2571       */
2572      myDefaultMemStore.getScannerCyclicBarrier.await();
2573
2574      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2575      flushThread.join();
2576
2577      if (myDefaultMemStore.shouldWait) {
2578        SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
2579        MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
2580        assertTrue(memStoreLAB.isClosed());
2581        assertTrue(!memStoreLAB.chunks.isEmpty());
2582        assertTrue(!memStoreLAB.isReclaimed());
2583
2584        Cell cell1 = segmentScanner.next();
2585        CellUtil.equals(smallCell, cell1);
2586        Cell cell2 = segmentScanner.next();
2587        CellUtil.equals(largeCell, cell2);
2588        assertNull(segmentScanner.next());
2589      } else {
2590        List<Cell> results = new ArrayList<>();
2591        storeScanner.next(results);
2592        assertEquals(2, results.size());
2593        CellUtil.equals(smallCell, results.get(0));
2594        CellUtil.equals(largeCell, results.get(1));
2595      }
2596      assertTrue(exceptionRef.get() == null);
2597    } finally {
2598      if (storeScanner != null) {
2599        storeScanner.close();
2600      }
2601      Thread.currentThread().setName(oldThreadName);
2602    }
2603  }
2604
2605  @SuppressWarnings("unchecked")
2606  private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
2607    List<T> resultScanners = new ArrayList<T>();
2608    for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
2609      if (keyValueScannerClass.isInstance(keyValueScanner)) {
2610        resultScanners.add((T) keyValueScanner);
2611      }
2612    }
2613    assertTrue(resultScanners.size() == 1);
2614    return resultScanners.get(0);
2615  }
2616
2617  @Test
2618  public void testOnConfigurationChange() throws IOException {
2619    final int COMMON_MAX_FILES_TO_COMPACT = 10;
2620    final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8;
2621    final int STORE_MAX_FILES_TO_COMPACT = 6;
2622
2623    // Build a table that its maxFileToCompact different from common configuration.
2624    Configuration conf = HBaseConfiguration.create();
2625    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2626      COMMON_MAX_FILES_TO_COMPACT);
2627    conf.setBoolean(CACHE_DATA_ON_READ_KEY, false);
2628    conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
2629    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
2630    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family)
2631      .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2632        String.valueOf(STORE_MAX_FILES_TO_COMPACT))
2633      .build();
2634    init(this.name.getMethodName(), conf, hcd);
2635
2636    // After updating common configuration, the conf in HStore itself must not be changed.
2637    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY,
2638      NEW_COMMON_MAX_FILES_TO_COMPACT);
2639    this.store.onConfigurationChange(conf);
2640
2641    assertEquals(STORE_MAX_FILES_TO_COMPACT,
2642      store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact());
2643
2644    assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false);
2645    assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true);
2646    assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true);
2647
2648    // reset to default values
2649    conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ);
2650    conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE);
2651    conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
2652    this.store.onConfigurationChange(conf);
2653  }
2654
2655  /**
2656   * This test is for HBASE-26476
2657   */
2658  @Test
2659  public void testExtendsDefaultMemStore() throws Exception {
2660    Configuration conf = HBaseConfiguration.create();
2661    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2662
2663    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2664    assertTrue(this.store.memstore.getClass() == DefaultMemStore.class);
2665    tearDown();
2666
2667    conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName());
2668    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2669    assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class);
2670  }
2671
2672  static class CustomDefaultMemStore extends DefaultMemStore {
2673
2674    public CustomDefaultMemStore(Configuration conf, CellComparator c,
2675      RegionServicesForStores regionServices) {
2676      super(conf, c, regionServices);
2677    }
2678
2679  }
2680
2681  /**
2682   * This test is for HBASE-26488
2683   */
2684  @Test
2685  public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {
2686
2687    Configuration conf = HBaseConfiguration.create();
2688
2689    byte[] smallValue = new byte[3];
2690    byte[] largeValue = new byte[9];
2691    final long timestamp = EnvironmentEdgeManager.currentTime();
2692    final long seqId = 100;
2693    final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
2694    final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
2695    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2696    quals.add(qf1);
2697    quals.add(qf2);
2698
2699    conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
2700    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2701    conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
2702      MyDefaultStoreFlusher.class.getName());
2703
2704    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
2705    MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
2706    assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);
2707
2708    MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2709    store.add(smallCell, memStoreSizing);
2710    store.add(largeCell, memStoreSizing);
2711    flushStore(store, id++);
2712
2713    MemStoreLABImpl memStoreLAB =
2714      (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
2715    assertTrue(memStoreLAB.isClosed());
2716    assertTrue(memStoreLAB.getRefCntValue() == 0);
2717    assertTrue(memStoreLAB.isReclaimed());
2718    assertTrue(memStoreLAB.chunks.isEmpty());
2719    StoreScanner storeScanner = null;
2720    try {
2721      storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
2722      assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
2723      assertTrue(store.memstore.size().getCellsCount() == 0);
2724      assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
2725      assertTrue(storeScanner.currentScanners.size() == 1);
2726      assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);
2727
2728      List<Cell> results = new ArrayList<>();
2729      storeScanner.next(results);
2730      assertEquals(2, results.size());
2731      CellUtil.equals(smallCell, results.get(0));
2732      CellUtil.equals(largeCell, results.get(1));
2733    } finally {
2734      if (storeScanner != null) {
2735        storeScanner.close();
2736      }
2737    }
2738  }
2739
2740  static class MyDefaultMemStore1 extends DefaultMemStore {
2741
2742    private ImmutableSegment snapshotImmutableSegment;
2743
2744    public MyDefaultMemStore1(Configuration conf, CellComparator c,
2745      RegionServicesForStores regionServices) {
2746      super(conf, c, regionServices);
2747    }
2748
2749    @Override
2750    public MemStoreSnapshot snapshot() {
2751      MemStoreSnapshot result = super.snapshot();
2752      this.snapshotImmutableSegment = snapshot;
2753      return result;
2754    }
2755
2756  }
2757
2758  public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
2759    private static final AtomicInteger failCounter = new AtomicInteger(1);
2760    private static final AtomicInteger counter = new AtomicInteger(0);
2761
2762    public MyDefaultStoreFlusher(Configuration conf, HStore store) {
2763      super(conf, store);
2764    }
2765
2766    @Override
2767    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
2768      MonitoredTask status, ThroughputController throughputController,
2769      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
2770      counter.incrementAndGet();
2771      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
2772        writerCreationTracker);
2773    }
2774
2775    @Override
2776    protected void performFlush(InternalScanner scanner, final CellSink sink,
2777      ThroughputController throughputController) throws IOException {
2778
2779      final int currentCount = counter.get();
2780      CellSink newCellSink = (cell) -> {
2781        if (currentCount <= failCounter.get()) {
2782          throw new IOException("Simulated exception by tests");
2783        }
2784        sink.append(cell);
2785      };
2786      super.performFlush(scanner, newCellSink, throughputController);
2787    }
2788  }
2789
2790  /**
2791   * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB}
2792   */
2793  @Test
2794  public void testImmutableMemStoreLABRefCnt() throws Exception {
2795    Configuration conf = HBaseConfiguration.create();
2796
2797    byte[] smallValue = new byte[3];
2798    byte[] largeValue = new byte[9];
2799    final long timestamp = EnvironmentEdgeManager.currentTime();
2800    final long seqId = 100;
2801    final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue);
2802    final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue);
2803    final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue);
2804    final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue);
2805    final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue);
2806    final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue);
2807
2808    int smallCellByteSize = MutableSegment.getCellLength(smallCell1);
2809    int largeCellByteSize = MutableSegment.getCellLength(largeCell1);
2810    int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize);
2811    int flushByteSize = firstWriteCellByteSize - 2;
2812
2813    // set CompactingMemStore.inmemoryFlushSize to flushByteSize.
2814    conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName());
2815    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
2816    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200));
2817    conf.setBoolean(WALFactory.WAL_ENABLED, false);
2818
2819    init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
2820      .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
2821
2822    final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore);
2823    assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize);
2824    myCompactingMemStore.allowCompaction.set(false);
2825
2826    NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
2827    store.add(smallCell1, memStoreSizing);
2828    store.add(largeCell1, memStoreSizing);
2829    store.add(smallCell2, memStoreSizing);
2830    store.add(largeCell2, memStoreSizing);
2831    store.add(smallCell3, memStoreSizing);
2832    store.add(largeCell3, memStoreSizing);
2833    VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2834    assertTrue(versionedSegmentsList.getNumOfSegments() == 3);
2835    List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments();
2836    List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size());
2837    for (ImmutableSegment segment : segments) {
2838      memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB());
2839    }
2840    List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2841    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2842      assertTrue(memStoreLAB.getRefCntValue() == 2);
2843    }
2844
2845    myCompactingMemStore.allowCompaction.set(true);
2846    myCompactingMemStore.flushInMemory();
2847
2848    versionedSegmentsList = myCompactingMemStore.getImmutableSegments();
2849    assertTrue(versionedSegmentsList.getNumOfSegments() == 1);
2850    ImmutableMemStoreLAB immutableMemStoreLAB =
2851      (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB());
2852    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2853      assertTrue(memStoreLAB.getRefCntValue() == 2);
2854    }
2855
2856    List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE);
2857    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2858      assertTrue(memStoreLAB.getRefCntValue() == 2);
2859    }
2860    assertTrue(immutableMemStoreLAB.getRefCntValue() == 2);
2861    for (KeyValueScanner scanner : scanners1) {
2862      scanner.close();
2863    }
2864    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2865      assertTrue(memStoreLAB.getRefCntValue() == 1);
2866    }
2867    for (KeyValueScanner scanner : scanners2) {
2868      scanner.close();
2869    }
2870    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2871      assertTrue(memStoreLAB.getRefCntValue() == 1);
2872    }
2873    assertTrue(immutableMemStoreLAB.getRefCntValue() == 1);
2874    flushStore(store, id++);
2875    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2876      assertTrue(memStoreLAB.getRefCntValue() == 0);
2877    }
2878    assertTrue(immutableMemStoreLAB.getRefCntValue() == 0);
2879    assertTrue(immutableMemStoreLAB.isClosed());
2880    for (MemStoreLABImpl memStoreLAB : memStoreLABs) {
2881      assertTrue(memStoreLAB.isClosed());
2882      assertTrue(memStoreLAB.isReclaimed());
2883      assertTrue(memStoreLAB.chunks.isEmpty());
2884    }
2885  }
2886
2887  private HStoreFile mockStoreFileWithLength(long length) {
2888    HStoreFile sf = mock(HStoreFile.class);
2889    StoreFileReader sfr = mock(StoreFileReader.class);
2890    when(sf.isHFile()).thenReturn(true);
2891    when(sf.getReader()).thenReturn(sfr);
2892    when(sfr.length()).thenReturn(length);
2893    return sf;
2894  }
2895
2896  private static class MyThread extends Thread {
2897    private StoreScanner scanner;
2898    private KeyValueHeap heap;
2899
2900    public MyThread(StoreScanner scanner) {
2901      this.scanner = scanner;
2902    }
2903
2904    public KeyValueHeap getHeap() {
2905      return this.heap;
2906    }
2907
2908    @Override
2909    public void run() {
2910      scanner.trySwitchToStreamRead();
2911      heap = scanner.heap;
2912    }
2913  }
2914
2915  private static class MyMemStoreCompactor extends MemStoreCompactor {
2916    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2917    private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
2918
2919    public MyMemStoreCompactor(CompactingMemStore compactingMemStore,
2920      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
2921      super(compactingMemStore, compactionPolicy);
2922    }
2923
2924    @Override
2925    public boolean start() throws IOException {
2926      boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
2927      if (isFirst) {
2928        try {
2929          START_COMPACTOR_LATCH.await();
2930          return super.start();
2931        } catch (InterruptedException ex) {
2932          throw new RuntimeException(ex);
2933        }
2934      }
2935      return super.start();
2936    }
2937  }
2938
2939  public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
2940    private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
2941
2942    public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c,
2943      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2944      throws IOException {
2945      super(conf, c, store, regionServices, compactionPolicy);
2946    }
2947
2948    @Override
2949    protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
2950      throws IllegalArgumentIOException {
2951      return new MyMemStoreCompactor(this, compactionPolicy);
2952    }
2953
2954    @Override
2955    protected boolean setInMemoryCompactionFlag() {
2956      boolean rval = super.setInMemoryCompactionFlag();
2957      if (rval) {
2958        RUNNER_COUNT.incrementAndGet();
2959        if (LOG.isDebugEnabled()) {
2960          LOG.debug("runner count: " + RUNNER_COUNT.get());
2961        }
2962      }
2963      return rval;
2964    }
2965  }
2966
2967  public static class MyCompactingMemStore extends CompactingMemStore {
2968    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
2969    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
2970    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
2971
2972    public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store,
2973      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
2974      throws IOException {
2975      super(conf, c, store, regionServices, compactionPolicy);
2976    }
2977
2978    @Override
2979    protected List<KeyValueScanner> createList(int capacity) {
2980      if (START_TEST.get()) {
2981        try {
2982          getScannerLatch.countDown();
2983          snapshotLatch.await();
2984        } catch (InterruptedException e) {
2985          throw new RuntimeException(e);
2986        }
2987      }
2988      return new ArrayList<>(capacity);
2989    }
2990
2991    @Override
2992    protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) {
2993      if (START_TEST.get()) {
2994        try {
2995          getScannerLatch.await();
2996        } catch (InterruptedException e) {
2997          throw new RuntimeException(e);
2998        }
2999      }
3000
3001      super.pushActiveToPipeline(active, checkEmpty);
3002      if (START_TEST.get()) {
3003        snapshotLatch.countDown();
3004      }
3005    }
3006  }
3007
3008  interface MyListHook {
3009    void hook(int currentSize);
3010  }
3011
3012  private static class MyList<T> implements List<T> {
3013    private final List<T> delegatee = new ArrayList<>();
3014    private final MyListHook hookAtAdd;
3015
3016    MyList(final MyListHook hookAtAdd) {
3017      this.hookAtAdd = hookAtAdd;
3018    }
3019
3020    @Override
3021    public int size() {
3022      return delegatee.size();
3023    }
3024
3025    @Override
3026    public boolean isEmpty() {
3027      return delegatee.isEmpty();
3028    }
3029
3030    @Override
3031    public boolean contains(Object o) {
3032      return delegatee.contains(o);
3033    }
3034
3035    @Override
3036    public Iterator<T> iterator() {
3037      return delegatee.iterator();
3038    }
3039
3040    @Override
3041    public Object[] toArray() {
3042      return delegatee.toArray();
3043    }
3044
3045    @Override
3046    public <R> R[] toArray(R[] a) {
3047      return delegatee.toArray(a);
3048    }
3049
3050    @Override
3051    public boolean add(T e) {
3052      hookAtAdd.hook(size());
3053      return delegatee.add(e);
3054    }
3055
3056    @Override
3057    public boolean remove(Object o) {
3058      return delegatee.remove(o);
3059    }
3060
3061    @Override
3062    public boolean containsAll(Collection<?> c) {
3063      return delegatee.containsAll(c);
3064    }
3065
3066    @Override
3067    public boolean addAll(Collection<? extends T> c) {
3068      return delegatee.addAll(c);
3069    }
3070
3071    @Override
3072    public boolean addAll(int index, Collection<? extends T> c) {
3073      return delegatee.addAll(index, c);
3074    }
3075
3076    @Override
3077    public boolean removeAll(Collection<?> c) {
3078      return delegatee.removeAll(c);
3079    }
3080
3081    @Override
3082    public boolean retainAll(Collection<?> c) {
3083      return delegatee.retainAll(c);
3084    }
3085
3086    @Override
3087    public void clear() {
3088      delegatee.clear();
3089    }
3090
3091    @Override
3092    public T get(int index) {
3093      return delegatee.get(index);
3094    }
3095
3096    @Override
3097    public T set(int index, T element) {
3098      return delegatee.set(index, element);
3099    }
3100
3101    @Override
3102    public void add(int index, T element) {
3103      delegatee.add(index, element);
3104    }
3105
3106    @Override
3107    public T remove(int index) {
3108      return delegatee.remove(index);
3109    }
3110
3111    @Override
3112    public int indexOf(Object o) {
3113      return delegatee.indexOf(o);
3114    }
3115
3116    @Override
3117    public int lastIndexOf(Object o) {
3118      return delegatee.lastIndexOf(o);
3119    }
3120
3121    @Override
3122    public ListIterator<T> listIterator() {
3123      return delegatee.listIterator();
3124    }
3125
3126    @Override
3127    public ListIterator<T> listIterator(int index) {
3128      return delegatee.listIterator(index);
3129    }
3130
3131    @Override
3132    public List<T> subList(int fromIndex, int toIndex) {
3133      return delegatee.subList(fromIndex, toIndex);
3134    }
3135  }
3136
3137  public static class MyCompactingMemStore2 extends CompactingMemStore {
3138    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3139    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3140    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3141    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3142    private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0);
3143    private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0);
3144
3145    public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator,
3146      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3147      throws IOException {
3148      super(conf, cellComparator, store, regionServices, compactionPolicy);
3149    }
3150
3151    @Override
3152    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3153      MemStoreSizing memstoreSizing) {
3154      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3155        int currentCount = largeCellPreUpdateCounter.incrementAndGet();
3156        if (currentCount <= 1) {
3157          try {
3158            /**
3159             * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then
3160             * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then
3161             * largeCellThread invokes flushInMemory.
3162             */
3163            preCyclicBarrier.await();
3164          } catch (Throwable e) {
3165            throw new RuntimeException(e);
3166          }
3167        }
3168      }
3169
3170      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3171      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3172        try {
3173          preCyclicBarrier.await();
3174        } catch (Throwable e) {
3175          throw new RuntimeException(e);
3176        }
3177      }
3178      return returnValue;
3179    }
3180
3181    @Override
3182    protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
3183      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3184        try {
3185          /**
3186           * After largeCellThread finished flushInMemory method, smallCellThread can add cell to
3187           * currentActive . That is to say when largeCellThread called flushInMemory method,
3188           * currentActive has no cell.
3189           */
3190          postCyclicBarrier.await();
3191        } catch (Throwable e) {
3192          throw new RuntimeException(e);
3193        }
3194      }
3195      super.doAdd(currentActive, cell, memstoreSizing);
3196    }
3197
3198    @Override
3199    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3200      super.flushInMemory(currentActiveMutableSegment);
3201      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3202        if (largeCellPreUpdateCounter.get() <= 1) {
3203          try {
3204            postCyclicBarrier.await();
3205          } catch (Throwable e) {
3206            throw new RuntimeException(e);
3207          }
3208        }
3209      }
3210    }
3211
3212  }
3213
3214  public static class MyCompactingMemStore3 extends CompactingMemStore {
3215    private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
3216    private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
3217
3218    private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2);
3219    private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2);
3220    private final AtomicInteger flushCounter = new AtomicInteger(0);
3221    private static final int CELL_COUNT = 5;
3222    private boolean flushByteSizeLessThanSmallAndLargeCellSize = true;
3223
3224    public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator,
3225      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3226      throws IOException {
3227      super(conf, cellComparator, store, regionServices, compactionPolicy);
3228    }
3229
3230    @Override
3231    protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
3232      MemStoreSizing memstoreSizing) {
3233      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3234        return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3235      }
3236      if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) {
3237        try {
3238          preCyclicBarrier.await();
3239        } catch (Throwable e) {
3240          throw new RuntimeException(e);
3241        }
3242      }
3243
3244      boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing);
3245      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3246        try {
3247          preCyclicBarrier.await();
3248        } catch (Throwable e) {
3249          throw new RuntimeException(e);
3250        }
3251      }
3252      return returnValue;
3253    }
3254
3255    @Override
3256    protected void postUpdate(MutableSegment currentActiveMutableSegment) {
3257      super.postUpdate(currentActiveMutableSegment);
3258      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3259        try {
3260          postCyclicBarrier.await();
3261        } catch (Throwable e) {
3262          throw new RuntimeException(e);
3263        }
3264        return;
3265      }
3266
3267      if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) {
3268        try {
3269          postCyclicBarrier.await();
3270        } catch (Throwable e) {
3271          throw new RuntimeException(e);
3272        }
3273      }
3274    }
3275
3276    @Override
3277    protected void flushInMemory(MutableSegment currentActiveMutableSegment) {
3278      super.flushInMemory(currentActiveMutableSegment);
3279      flushCounter.incrementAndGet();
3280      if (!flushByteSizeLessThanSmallAndLargeCellSize) {
3281        return;
3282      }
3283
3284      assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME));
3285      try {
3286        postCyclicBarrier.await();
3287      } catch (Throwable e) {
3288        throw new RuntimeException(e);
3289      }
3290
3291    }
3292
3293    void disableCompaction() {
3294      allowCompaction.set(false);
3295    }
3296
3297    void enableCompaction() {
3298      allowCompaction.set(true);
3299    }
3300
3301  }
3302
3303  public static class MyCompactingMemStore4 extends CompactingMemStore {
3304    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3305    /**
3306     * {@link CompactingMemStore#flattenOneSegment} must execute after
3307     * {@link CompactingMemStore#getImmutableSegments}
3308     */
3309    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3310    /**
3311     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3312     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3313     */
3314    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3315    /**
3316     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3317     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3318     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3319     */
3320    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3321    /**
3322     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3323     */
3324    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3325    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3326    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3327    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3328    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3329
3330    public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator,
3331      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3332      throws IOException {
3333      super(conf, cellComparator, store, regionServices, compactionPolicy);
3334    }
3335
3336    @Override
3337    public VersionedSegmentsList getImmutableSegments() {
3338      VersionedSegmentsList result = super.getImmutableSegments();
3339      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3340        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3341        if (currentCount <= 1) {
3342          try {
3343            flattenOneSegmentPreCyclicBarrier.await();
3344          } catch (Throwable e) {
3345            throw new RuntimeException(e);
3346          }
3347        }
3348      }
3349      return result;
3350    }
3351
3352    @Override
3353    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3354      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3355        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3356        if (currentCount <= 1) {
3357          try {
3358            flattenOneSegmentPostCyclicBarrier.await();
3359          } catch (Throwable e) {
3360            throw new RuntimeException(e);
3361          }
3362        }
3363      }
3364      boolean result = super.swapPipelineWithNull(segments);
3365      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3366        int currentCount = swapPipelineWithNullCounter.get();
3367        if (currentCount <= 1) {
3368          assertTrue(!result);
3369        }
3370        if (currentCount == 2) {
3371          assertTrue(result);
3372        }
3373      }
3374      return result;
3375
3376    }
3377
3378    @Override
3379    public void flattenOneSegment(long requesterVersion, Action action) {
3380      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3381      if (currentCount <= 1) {
3382        try {
3383          /**
3384           * {@link CompactingMemStore#snapshot} could start.
3385           */
3386          snapShotStartCyclicCyclicBarrier.await();
3387          flattenOneSegmentPreCyclicBarrier.await();
3388        } catch (Throwable e) {
3389          throw new RuntimeException(e);
3390        }
3391      }
3392      super.flattenOneSegment(requesterVersion, action);
3393      if (currentCount <= 1) {
3394        try {
3395          flattenOneSegmentPostCyclicBarrier.await();
3396        } catch (Throwable e) {
3397          throw new RuntimeException(e);
3398        }
3399      }
3400    }
3401
3402    @Override
3403    protected boolean setInMemoryCompactionFlag() {
3404      boolean result = super.setInMemoryCompactionFlag();
3405      assertTrue(result);
3406      setInMemoryCompactionFlagCounter.incrementAndGet();
3407      return result;
3408    }
3409
3410    @Override
3411    void inMemoryCompaction() {
3412      try {
3413        super.inMemoryCompaction();
3414      } finally {
3415        try {
3416          inMemoryCompactionEndCyclicBarrier.await();
3417        } catch (Throwable e) {
3418          throw new RuntimeException(e);
3419        }
3420
3421      }
3422    }
3423
3424  }
3425
3426  public static class MyCompactingMemStore5 extends CompactingMemStore {
3427    private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread";
3428    private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread";
3429    /**
3430     * {@link CompactingMemStore#flattenOneSegment} must execute after
3431     * {@link CompactingMemStore#getImmutableSegments}
3432     */
3433    private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2);
3434    /**
3435     * Only after {@link CompactingMemStore#flattenOneSegment} completed,
3436     * {@link CompactingMemStore#swapPipelineWithNull} could execute.
3437     */
3438    private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2);
3439    /**
3440     * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the
3441     * snapshot thread starts {@link CompactingMemStore#snapshot},because
3442     * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}.
3443     */
3444    private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2);
3445    /**
3446     * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping.
3447     */
3448    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3449    private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0);
3450    private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0);
3451    private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0);
3452    private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0);
3453    /**
3454     * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain
3455     * thread could start.
3456     */
3457    private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2);
3458    /**
3459     * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the
3460     * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would
3461     * execute,and in memory compact thread would exit,because we expect that in memory compact
3462     * executing only once.
3463     */
3464    private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3);
3465
3466    public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator,
3467      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3468      throws IOException {
3469      super(conf, cellComparator, store, regionServices, compactionPolicy);
3470    }
3471
3472    @Override
3473    public VersionedSegmentsList getImmutableSegments() {
3474      VersionedSegmentsList result = super.getImmutableSegments();
3475      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3476        int currentCount = getImmutableSegmentsListCounter.incrementAndGet();
3477        if (currentCount <= 1) {
3478          try {
3479            flattenOneSegmentPreCyclicBarrier.await();
3480          } catch (Throwable e) {
3481            throw new RuntimeException(e);
3482          }
3483        }
3484
3485      }
3486
3487      return result;
3488    }
3489
3490    @Override
3491    protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
3492      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3493        int currentCount = swapPipelineWithNullCounter.incrementAndGet();
3494        if (currentCount <= 1) {
3495          try {
3496            flattenOneSegmentPostCyclicBarrier.await();
3497          } catch (Throwable e) {
3498            throw new RuntimeException(e);
3499          }
3500        }
3501
3502        if (currentCount == 2) {
3503          try {
3504            /**
3505             * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull},
3506             * writeAgain thread could start.
3507             */
3508            writeMemStoreAgainStartCyclicBarrier.await();
3509            /**
3510             * Only the writeAgain thread completes, retry
3511             * {@link CompactingMemStore#swapPipelineWithNull} would execute.
3512             */
3513            writeMemStoreAgainEndCyclicBarrier.await();
3514          } catch (Throwable e) {
3515            throw new RuntimeException(e);
3516          }
3517        }
3518
3519      }
3520      boolean result = super.swapPipelineWithNull(segments);
3521      if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) {
3522        int currentCount = swapPipelineWithNullCounter.get();
3523        if (currentCount <= 1) {
3524          assertTrue(!result);
3525        }
3526        if (currentCount == 2) {
3527          assertTrue(result);
3528        }
3529      }
3530      return result;
3531
3532    }
3533
3534    @Override
3535    public void flattenOneSegment(long requesterVersion, Action action) {
3536      int currentCount = flattenOneSegmentCounter.incrementAndGet();
3537      if (currentCount <= 1) {
3538        try {
3539          /**
3540           * {@link CompactingMemStore#snapshot} could start.
3541           */
3542          snapShotStartCyclicCyclicBarrier.await();
3543          flattenOneSegmentPreCyclicBarrier.await();
3544        } catch (Throwable e) {
3545          throw new RuntimeException(e);
3546        }
3547      }
3548      super.flattenOneSegment(requesterVersion, action);
3549      if (currentCount <= 1) {
3550        try {
3551          flattenOneSegmentPostCyclicBarrier.await();
3552          /**
3553           * Only the writeAgain thread completes, in memory compact thread would exit,because we
3554           * expect that in memory compact executing only once.
3555           */
3556          writeMemStoreAgainEndCyclicBarrier.await();
3557        } catch (Throwable e) {
3558          throw new RuntimeException(e);
3559        }
3560
3561      }
3562    }
3563
3564    @Override
3565    protected boolean setInMemoryCompactionFlag() {
3566      boolean result = super.setInMemoryCompactionFlag();
3567      int count = setInMemoryCompactionFlagCounter.incrementAndGet();
3568      if (count <= 1) {
3569        assertTrue(result);
3570      }
3571      if (count == 2) {
3572        assertTrue(!result);
3573      }
3574      return result;
3575    }
3576
3577    @Override
3578    void inMemoryCompaction() {
3579      try {
3580        super.inMemoryCompaction();
3581      } finally {
3582        try {
3583          inMemoryCompactionEndCyclicBarrier.await();
3584        } catch (Throwable e) {
3585          throw new RuntimeException(e);
3586        }
3587
3588      }
3589    }
3590  }
3591
3592  public static class MyCompactingMemStore6 extends CompactingMemStore {
3593    private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2);
3594
3595    public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator,
3596      HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
3597      throws IOException {
3598      super(conf, cellComparator, store, regionServices, compactionPolicy);
3599    }
3600
3601    @Override
3602    void inMemoryCompaction() {
3603      try {
3604        super.inMemoryCompaction();
3605      } finally {
3606        try {
3607          inMemoryCompactionEndCyclicBarrier.await();
3608        } catch (Throwable e) {
3609          throw new RuntimeException(e);
3610        }
3611
3612      }
3613    }
3614  }
3615
3616  public static class MyDefaultMemStore extends DefaultMemStore {
3617    private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread";
3618    private static final String FLUSH_THREAD_NAME = "flushMyThread";
3619    /**
3620     * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread
3621     * could start.
3622     */
3623    private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2);
3624    /**
3625     * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments}
3626     * completed, {@link DefaultMemStore#doClearSnapShot} could continue.
3627     */
3628    private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3629    /**
3630     * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot}
3631     * completed, {@link DefaultMemStore#getScanners} could continue.
3632     */
3633    private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2);
3634    private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0);
3635    private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0);
3636    private volatile boolean shouldWait = true;
3637    private volatile HStore store = null;
3638
3639    public MyDefaultMemStore(Configuration conf, CellComparator cellComparator,
3640      RegionServicesForStores regionServices) throws IOException {
3641      super(conf, cellComparator, regionServices);
3642    }
3643
3644    @Override
3645    protected List<Segment> getSnapshotSegments() {
3646
3647      List<Segment> result = super.getSnapshotSegments();
3648
3649      if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) {
3650        int currentCount = getSnapshotSegmentsCounter.incrementAndGet();
3651        if (currentCount == 1) {
3652          if (this.shouldWait) {
3653            try {
3654              /**
3655               * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed,
3656               * {@link DefaultMemStore#doClearSnapShot} could continue.
3657               */
3658              preClearSnapShotCyclicBarrier.await();
3659              /**
3660               * Wait for {@link DefaultMemStore#doClearSnapShot} completed.
3661               */
3662              postClearSnapShotCyclicBarrier.await();
3663
3664            } catch (Throwable e) {
3665              throw new RuntimeException(e);
3666            }
3667          }
3668        }
3669      }
3670      return result;
3671    }
3672
3673    @Override
3674    protected void doClearSnapShot() {
3675      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3676        int currentCount = clearSnapshotCounter.incrementAndGet();
3677        if (currentCount == 1) {
3678          try {
3679            if (
3680              ((ReentrantReadWriteLock) store.getStoreEngine().getLock())
3681                .isWriteLockedByCurrentThread()
3682            ) {
3683              shouldWait = false;
3684            }
3685            /**
3686             * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner
3687             * thread could start.
3688             */
3689            getScannerCyclicBarrier.await();
3690
3691            if (shouldWait) {
3692              /**
3693               * Wait for {@link DefaultMemStore#getSnapshotSegments} completed.
3694               */
3695              preClearSnapShotCyclicBarrier.await();
3696            }
3697          } catch (Throwable e) {
3698            throw new RuntimeException(e);
3699          }
3700        }
3701      }
3702      super.doClearSnapShot();
3703
3704      if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) {
3705        int currentCount = clearSnapshotCounter.get();
3706        if (currentCount == 1) {
3707          if (shouldWait) {
3708            try {
3709              /**
3710               * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed,
3711               * {@link DefaultMemStore#getScanners} could continue.
3712               */
3713              postClearSnapShotCyclicBarrier.await();
3714            } catch (Throwable e) {
3715              throw new RuntimeException(e);
3716            }
3717          }
3718        }
3719      }
3720    }
3721  }
3722}