001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HColumnDescriptor;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionInfo;
042import org.apache.hadoop.hbase.HTableDescriptor;
043import org.apache.hadoop.hbase.KeepDeletedCells;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.KeyValueTestUtil;
046import org.apache.hadoop.hbase.MemoryCompactionPolicy;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
050import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.EnvironmentEdge;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.hadoop.hbase.wal.WAL;
058import org.junit.After;
059import org.junit.Before;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.mockito.Mockito;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * compacted memstore test case
069 */
070@Category({ RegionServerTests.class, MediumTests.class })
071public class TestCompactingMemStore extends TestDefaultMemStore {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestCompactingMemStore.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
078  protected static ChunkCreator chunkCreator;
079  protected HRegion region;
080  protected RegionServicesForStores regionServicesForStores;
081  protected HStore store;
082
083  //////////////////////////////////////////////////////////////////////////////
084  // Helpers
085  //////////////////////////////////////////////////////////////////////////////
086  protected static byte[] makeQualifier(final int i1, final int i2) {
087    return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2));
088  }
089
090  @After
091  public void tearDown() throws Exception {
092    chunkCreator.clearChunksInPool();
093  }
094
095  @Override
096  @Before
097  public void setUp() throws Exception {
098    compactingSetUp();
099    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(),
100      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
101    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
102  }
103
104  protected void compactingSetUp() throws Exception {
105    super.internalSetUp();
106    Configuration conf = new Configuration();
107    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
108    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
109    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
110    HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
111    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
112    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
113    htd.addFamily(hcd);
114    HRegionInfo info = new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
115    WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
116    this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
117    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
118    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
119    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
120    this.store = new HStore(region, hcd, conf, false);
121
122    long globalMemStoreLimit =
123      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
124        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
125    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
126      globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
127      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
128    assertNotNull(chunkCreator);
129  }
130
131  /**
132   * A simple test which flush in memory affect timeOfOldestEdit
133   */
134  @Test
135  public void testTimeOfOldestEdit() {
136    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
137    final byte[] r = Bytes.toBytes("r");
138    final byte[] f = Bytes.toBytes("f");
139    final byte[] q = Bytes.toBytes("q");
140    final byte[] v = Bytes.toBytes("v");
141    final KeyValue kv = new KeyValue(r, f, q, v);
142    memstore.add(kv, null);
143    long timeOfOldestEdit = memstore.timeOfOldestEdit();
144    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
145
146    ((CompactingMemStore) memstore).flushInMemory();
147    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
148    memstore.add(kv, null);
149    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
150    memstore.snapshot();
151    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
152  }
153
154  /**
155   * A simple test which verifies the 3 possible states when scanning across snapshot.
156   */
157  @Override
158  @Test
159  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
160    // we are going to the scanning across snapshot with two kvs
161    // kv1 should always be returned before kv2
162    final byte[] one = Bytes.toBytes(1);
163    final byte[] two = Bytes.toBytes(2);
164    final byte[] f = Bytes.toBytes("f");
165    final byte[] q = Bytes.toBytes("q");
166    final byte[] v = Bytes.toBytes(3);
167
168    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
169    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
170
171    // use case 1: both kvs in kvset
172    this.memstore.add(kv1.clone(), null);
173    this.memstore.add(kv2.clone(), null);
174    // snapshot is empty,active segment is not empty,
175    // empty segment is skipped.
176    verifyOneScanAcrossSnapshot2(kv1, kv2);
177
178    // use case 2: both kvs in snapshot
179    this.memstore.snapshot();
180    // active segment is empty,snapshot is not empty,
181    // empty segment is skipped.
182    verifyOneScanAcrossSnapshot2(kv1, kv2);
183
184    // use case 3: first in snapshot second in kvset
185    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
186      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
187    this.memstore.add(kv1.clone(), null);
188    // As compaction is starting in the background the repetition
189    // of the k1 might be removed BUT the scanners created earlier
190    // should look on the OLD MutableCellSetSegment, so this should be OK...
191    this.memstore.snapshot();
192    this.memstore.add(kv2.clone(), null);
193    verifyScanAcrossSnapshot2(kv1, kv2);
194  }
195
196  /**
197   * Test memstore snapshots
198   */
199  @Override
200  @Test
201  public void testSnapshotting() throws IOException {
202    final int snapshotCount = 5;
203    // Add some rows, run a snapshot. Do it a few times.
204    for (int i = 0; i < snapshotCount; i++) {
205      addRows(this.memstore);
206      runSnapshot(this.memstore, true);
207      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
208    }
209  }
210
211  //////////////////////////////////////////////////////////////////////////////
212  // Get tests
213  //////////////////////////////////////////////////////////////////////////////
214
215  /**
216   * Test getNextRow from memstore
217   */
218  @Override
219  @Test
220  public void testGetNextRow() throws Exception {
221    addRows(this.memstore);
222    // Add more versions to make it a little more interesting.
223    Thread.sleep(1);
224    addRows(this.memstore);
225    Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
226    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
227      new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0);
228    for (int i = 0; i < ROW_COUNT; i++) {
229      Cell nr = ((CompactingMemStore) this.memstore)
230        .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime()));
231      if (i + 1 == ROW_COUNT) {
232        assertNull(nr);
233      } else {
234        assertTrue(CellComparator.getInstance().compareRows(nr,
235          new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0);
236      }
237    }
238    // starting from each row, validate results should contain the starting row
239    Configuration conf = HBaseConfiguration.create();
240    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
241      ScanInfo scanInfo =
242        new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
243          HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
244      try (InternalScanner scanner =
245        new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
246          memstore.getScanners(0))) {
247        List<Cell> results = new ArrayList<>();
248        for (int i = 0; scanner.next(results); i++) {
249          int rowId = startRowId + i;
250          Cell left = results.get(0);
251          byte[] row1 = Bytes.toBytes(rowId);
252          assertTrue("Row name",
253            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
254          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
255          List<Cell> row = new ArrayList<>();
256          for (Cell kv : results) {
257            row.add(kv);
258          }
259          isExpectedRowWithoutTimestamps(rowId, row);
260          // Clear out set. Otherwise row results accumulate.
261          results.clear();
262        }
263      }
264    }
265  }
266
267  @Override
268  @Test
269  public void testGet_memstoreAndSnapShot() throws IOException {
270    byte[] row = Bytes.toBytes("testrow");
271    byte[] fam = Bytes.toBytes("testfamily");
272    byte[] qf1 = Bytes.toBytes("testqualifier1");
273    byte[] qf2 = Bytes.toBytes("testqualifier2");
274    byte[] qf3 = Bytes.toBytes("testqualifier3");
275    byte[] qf4 = Bytes.toBytes("testqualifier4");
276    byte[] qf5 = Bytes.toBytes("testqualifier5");
277    byte[] val = Bytes.toBytes("testval");
278
279    // Setting up memstore
280    memstore.add(new KeyValue(row, fam, qf1, val), null);
281    memstore.add(new KeyValue(row, fam, qf2, val), null);
282    memstore.add(new KeyValue(row, fam, qf3, val), null);
283    // Pushing to pipeline
284    ((CompactingMemStore) memstore).flushInMemory();
285    assertEquals(0, memstore.getSnapshot().getCellsCount());
286    // Creating a snapshot
287    memstore.snapshot();
288    assertEquals(3, memstore.getSnapshot().getCellsCount());
289    // Adding value to "new" memstore
290    assertEquals(0, memstore.getActive().getCellsCount());
291    memstore.add(new KeyValue(row, fam, qf4, val), null);
292    memstore.add(new KeyValue(row, fam, qf5, val), null);
293    assertEquals(2, memstore.getActive().getCellsCount());
294  }
295
296  ////////////////////////////////////
297  // Test for periodic memstore flushes
298  // based on time of oldest edit
299  ////////////////////////////////////
300
301  /**
302   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older
303   * keyvalues are deleted from the memstore.
304   */
305  @Override
306  @Test
307  public void testUpsertMemstoreSize() throws Exception {
308    MemStoreSize oldSize = memstore.size();
309
310    List<Cell> l = new ArrayList<>();
311    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
312    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
313    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
314
315    kv1.setSequenceId(1);
316    kv2.setSequenceId(1);
317    kv3.setSequenceId(1);
318    l.add(kv1);
319    l.add(kv2);
320    l.add(kv3);
321
322    this.memstore.upsert(l, 2, null);// readpoint is 2
323    MemStoreSize newSize = this.memstore.size();
324    assert (newSize.getDataSize() > oldSize.getDataSize());
325    // The kv1 should be removed.
326    assert (memstore.getActive().getCellsCount() == 2);
327
328    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
329    kv4.setSequenceId(1);
330    l.clear();
331    l.add(kv4);
332    this.memstore.upsert(l, 3, null);
333    assertEquals(newSize, this.memstore.size());
334    // The kv2 should be removed.
335    assert (memstore.getActive().getCellsCount() == 2);
336    // this.memstore = null;
337  }
338
339  /**
340   * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in
341   * memstore.
342   */
343  @Override
344  @Test
345  public void testUpdateToTimeOfOldestEdit() throws Exception {
346    try {
347      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
348      EnvironmentEdgeManager.injectEdge(edge);
349      long t = memstore.timeOfOldestEdit();
350      assertEquals(Long.MAX_VALUE, t);
351
352      // test the case that the timeOfOldestEdit is updated after a KV add
353      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
354      t = memstore.timeOfOldestEdit();
355      assertTrue(t == 1234);
356      // The method will also assert
357      // the value is reset to Long.MAX_VALUE
358      t = runSnapshot(memstore, true);
359
360      // test the case that the timeOfOldestEdit is updated after a KV delete
361      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
362      t = memstore.timeOfOldestEdit();
363      assertTrue(t == 1234);
364      t = runSnapshot(memstore, true);
365
366      // test the case that the timeOfOldestEdit is updated after a KV upsert
367      List<Cell> l = new ArrayList<>();
368      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
369      kv1.setSequenceId(100);
370      l.add(kv1);
371      memstore.upsert(l, 1000, null);
372      t = memstore.timeOfOldestEdit();
373      assertTrue(t == 1234);
374    } finally {
375      EnvironmentEdgeManager.reset();
376    }
377  }
378
379  private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException {
380    // Save off old state.
381    long oldHistorySize = hmc.getSnapshot().getDataSize();
382    long prevTimeStamp = hmc.timeOfOldestEdit();
383
384    hmc.snapshot();
385    MemStoreSnapshot snapshot = hmc.snapshot();
386    if (useForce) {
387      // Make some assertions about what just happened.
388      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
389      long t = hmc.timeOfOldestEdit();
390      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
391      hmc.clearSnapshot(snapshot.getId());
392    } else {
393      long t = hmc.timeOfOldestEdit();
394      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
395    }
396    return prevTimeStamp;
397  }
398
399  private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) {
400    int i = 0;
401    for (Cell kv : kvs) {
402      byte[] expectedColname = makeQualifier(rowIndex, i++);
403      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
404      // Value is column name as bytes. Usually result is
405      // 100 bytes in size at least. This is the default size
406      // for BytesWriteable. For comparison, convert bytes to
407      // String and trim to remove trailing null bytes.
408      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
409    }
410  }
411
412  @Test
413  public void testPuttingBackChunksAfterFlushing() throws IOException {
414    byte[] row = Bytes.toBytes("testrow");
415    byte[] fam = Bytes.toBytes("testfamily");
416    byte[] qf1 = Bytes.toBytes("testqualifier1");
417    byte[] qf2 = Bytes.toBytes("testqualifier2");
418    byte[] qf3 = Bytes.toBytes("testqualifier3");
419    byte[] qf4 = Bytes.toBytes("testqualifier4");
420    byte[] qf5 = Bytes.toBytes("testqualifier5");
421    byte[] val = Bytes.toBytes("testval");
422
423    // Setting up memstore
424    memstore.add(new KeyValue(row, fam, qf1, val), null);
425    memstore.add(new KeyValue(row, fam, qf2, val), null);
426    memstore.add(new KeyValue(row, fam, qf3, val), null);
427
428    // Creating a snapshot
429    MemStoreSnapshot snapshot = memstore.snapshot();
430    assertEquals(3, memstore.getSnapshot().getCellsCount());
431
432    // Adding value to "new" memstore
433    assertEquals(0, memstore.getActive().getCellsCount());
434    memstore.add(new KeyValue(row, fam, qf4, val), null);
435    memstore.add(new KeyValue(row, fam, qf5, val), null);
436    assertEquals(2, memstore.getActive().getCellsCount());
437    // close the scanners
438    for (KeyValueScanner scanner : snapshot.getScanners()) {
439      scanner.close();
440    }
441    memstore.clearSnapshot(snapshot.getId());
442
443    int chunkCount = chunkCreator.getPoolSize();
444    assertTrue(chunkCount > 0);
445
446  }
447
448  @Test
449  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
450    byte[] row = Bytes.toBytes("testrow");
451    byte[] fam = Bytes.toBytes("testfamily");
452    byte[] qf1 = Bytes.toBytes("testqualifier1");
453    byte[] qf2 = Bytes.toBytes("testqualifier2");
454    byte[] qf3 = Bytes.toBytes("testqualifier3");
455    byte[] qf4 = Bytes.toBytes("testqualifier4");
456    byte[] qf5 = Bytes.toBytes("testqualifier5");
457    byte[] qf6 = Bytes.toBytes("testqualifier6");
458    byte[] qf7 = Bytes.toBytes("testqualifier7");
459    byte[] val = Bytes.toBytes("testval");
460
461    // Setting up memstore
462    memstore.add(new KeyValue(row, fam, qf1, val), null);
463    memstore.add(new KeyValue(row, fam, qf2, val), null);
464    memstore.add(new KeyValue(row, fam, qf3, val), null);
465
466    // Creating a snapshot
467    MemStoreSnapshot snapshot = memstore.snapshot();
468    assertEquals(3, memstore.getSnapshot().getCellsCount());
469
470    // Adding value to "new" memstore
471    assertEquals(0, memstore.getActive().getCellsCount());
472    memstore.add(new KeyValue(row, fam, qf4, val), null);
473    memstore.add(new KeyValue(row, fam, qf5, val), null);
474    assertEquals(2, memstore.getActive().getCellsCount());
475
476    // opening scanner before clear the snapshot
477    List<KeyValueScanner> scanners = memstore.getScanners(0);
478    // Shouldn't putting back the chunks to pool,since some scanners are opening
479    // based on their data
480    // close the scanners
481    for (KeyValueScanner scanner : snapshot.getScanners()) {
482      scanner.close();
483    }
484    memstore.clearSnapshot(snapshot.getId());
485
486    assertTrue(chunkCreator.getPoolSize() == 0);
487
488    // Chunks will be put back to pool after close scanners;
489    for (KeyValueScanner scanner : scanners) {
490      scanner.close();
491    }
492    assertTrue(chunkCreator.getPoolSize() > 0);
493
494    // clear chunks
495    chunkCreator.clearChunksInPool();
496
497    // Creating another snapshot
498
499    snapshot = memstore.snapshot();
500    // Adding more value
501    memstore.add(new KeyValue(row, fam, qf6, val), null);
502    memstore.add(new KeyValue(row, fam, qf7, val), null);
503    // opening scanners
504    scanners = memstore.getScanners(0);
505    // close scanners before clear the snapshot
506    for (KeyValueScanner scanner : scanners) {
507      scanner.close();
508    }
509    // Since no opening scanner, the chunks of snapshot should be put back to
510    // pool
511    // close the scanners
512    for (KeyValueScanner scanner : snapshot.getScanners()) {
513      scanner.close();
514    }
515    memstore.clearSnapshot(snapshot.getId());
516    assertTrue(chunkCreator.getPoolSize() > 0);
517  }
518
519  @Test
520  public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException {
521
522    // set memstore to do data compaction and not to use the speculative scan
523    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
524    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
525      String.valueOf(compactionType));
526    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
527
528    byte[] row = Bytes.toBytes("testrow");
529    byte[] fam = Bytes.toBytes("testfamily");
530    byte[] qf1 = Bytes.toBytes("testqualifier1");
531    byte[] qf2 = Bytes.toBytes("testqualifier2");
532    byte[] qf3 = Bytes.toBytes("testqualifier3");
533    byte[] val = Bytes.toBytes("testval");
534
535    // Setting up memstore
536    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
537    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
538    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
539
540    // Creating a pipeline
541    ((MyCompactingMemStore) memstore).disableCompaction();
542    ((CompactingMemStore) memstore).flushInMemory();
543
544    // Adding value to "new" memstore
545    assertEquals(0, memstore.getActive().getCellsCount());
546    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
547    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
548    assertEquals(2, memstore.getActive().getCellsCount());
549
550    // pipeline bucket 2
551    ((CompactingMemStore) memstore).flushInMemory();
552    // opening scanner before force flushing
553    List<KeyValueScanner> scanners = memstore.getScanners(0);
554    // Shouldn't putting back the chunks to pool,since some scanners are opening
555    // based on their data
556    ((MyCompactingMemStore) memstore).enableCompaction();
557    // trigger compaction
558    ((CompactingMemStore) memstore).flushInMemory();
559
560    // Adding value to "new" memstore
561    assertEquals(0, memstore.getActive().getCellsCount());
562    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
563    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
564    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
565    assertEquals(3, memstore.getActive().getCellsCount());
566
567    assertTrue(chunkCreator.getPoolSize() == 0);
568
569    // Chunks will be put back to pool after close scanners;
570    for (KeyValueScanner scanner : scanners) {
571      scanner.close();
572    }
573    assertTrue(chunkCreator.getPoolSize() > 0);
574
575    // clear chunks
576    chunkCreator.clearChunksInPool();
577
578    // Creating another snapshot
579
580    MemStoreSnapshot snapshot = memstore.snapshot();
581    // close the scanners
582    for (KeyValueScanner scanner : snapshot.getScanners()) {
583      scanner.close();
584    }
585    memstore.clearSnapshot(snapshot.getId());
586
587    snapshot = memstore.snapshot();
588    // Adding more value
589    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
590    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
591    // opening scanners
592    scanners = memstore.getScanners(0);
593    // close scanners before clear the snapshot
594    for (KeyValueScanner scanner : scanners) {
595      scanner.close();
596    }
597    // Since no opening scanner, the chunks of snapshot should be put back to
598    // pool
599    // close the scanners
600    for (KeyValueScanner scanner : snapshot.getScanners()) {
601      scanner.close();
602    }
603    memstore.clearSnapshot(snapshot.getId());
604    assertTrue(chunkCreator.getPoolSize() > 0);
605  }
606
607  //////////////////////////////////////////////////////////////////////////////
608  // Compaction tests
609  //////////////////////////////////////////////////////////////////////////////
610  @Test
611  public void testCompaction1Bucket() throws IOException {
612
613    // set memstore to do basic structure flattening, the "eager" option is tested in
614    // TestCompactingToCellFlatMapMemStore
615    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
616    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
617      String.valueOf(compactionType));
618    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
619
620    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
621
622    // test 1 bucket
623    int totalCellsLen = addRowsByKeys(memstore, keys1);
624    int oneCellOnCSLMHeapSize = 120;
625    int oneCellOnCAHeapSize = 88;
626    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
627    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
628    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
629
630    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
631    assertEquals(0, memstore.getSnapshot().getCellsCount());
632    // There is no compaction, as the compacting memstore type is basic.
633    // totalCellsLen remains the same
634    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
635      + 4 * oneCellOnCAHeapSize;
636    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
637    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
638
639    MemStoreSize mss = memstore.getFlushableSize();
640    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
641    // simulate flusher
642    region.decrMemStoreSize(mss);
643    ImmutableSegment s = memstore.getSnapshot();
644    assertEquals(4, s.getCellsCount());
645    assertEquals(0, regionServicesForStores.getMemStoreSize());
646
647    memstore.clearSnapshot(snapshot.getId());
648  }
649
650  @Test
651  public void testCompaction2Buckets() throws IOException {
652
653    // set memstore to do basic structure flattening, the "eager" option is tested in
654    // TestCompactingToCellFlatMapMemStore
655    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
656    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
657      String.valueOf(compactionType));
658    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
659      String.valueOf(1));
660    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
661    String[] keys1 = { "A", "A", "B", "C" };
662    String[] keys2 = { "A", "B", "D" };
663
664    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
665    int oneCellOnCSLMHeapSize = 120;
666    int oneCellOnCAHeapSize = 88;
667    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
668
669    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
670    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
671
672    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
673    int counter = 0;
674    for (Segment s : memstore.getSegments()) {
675      counter += s.getCellsCount();
676    }
677    assertEquals(4, counter);
678    assertEquals(0, memstore.getSnapshot().getCellsCount());
679    // There is no compaction, as the compacting memstore type is basic.
680    // totalCellsLen remains the same
681    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
682    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
683      + 4 * oneCellOnCAHeapSize;
684    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
685
686    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
687    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
688    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
689    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
690
691    MemStoreSize mss = memstore.getFlushableSize();
692    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
693    assertEquals(0, memstore.getSnapshot().getCellsCount());
694    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
695    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
696      + 7 * oneCellOnCAHeapSize;
697    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
698
699    mss = memstore.getFlushableSize();
700    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
701    // simulate flusher
702    region.decrMemStoreSize(mss);
703    ImmutableSegment s = memstore.getSnapshot();
704    assertEquals(7, s.getCellsCount());
705    assertEquals(0, regionServicesForStores.getMemStoreSize());
706
707    memstore.clearSnapshot(snapshot.getId());
708  }
709
710  @Test
711  public void testCompaction3Buckets() throws IOException {
712
713    // set memstore to do data compaction and not to use the speculative scan
714    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
715    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
716      String.valueOf(compactionType));
717    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
718    String[] keys1 = { "A", "A", "B", "C" };
719    String[] keys2 = { "A", "B", "D" };
720    String[] keys3 = { "D", "B", "B" };
721
722    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
723    int oneCellOnCSLMHeapSize = 120;
724    int oneCellOnCAHeapSize = 88;
725    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
726    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
727    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
728    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
729
730    assertEquals(0, memstore.getSnapshot().getCellsCount());
731    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
732    // totalCellsLen
733    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
734    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
735    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
736    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
737      + 3 * oneCellOnCAHeapSize;
738    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
739
740    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
741    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
742
743    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
744    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
745
746    ((MyCompactingMemStore) memstore).disableCompaction();
747    MemStoreSize mss = memstore.getFlushableSize();
748    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
749    assertEquals(0, memstore.getSnapshot().getCellsCount());
750    // No change in the cells data size. ie. memstore size. as there is no compaction.
751    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
752    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
753      ((CompactingMemStore) memstore).heapSize());
754
755    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
756    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
757      regionServicesForStores.getMemStoreSize());
758    long totalHeapSize3 =
759      totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize;
760    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
761
762    ((MyCompactingMemStore) memstore).enableCompaction();
763    mss = memstore.getFlushableSize();
764    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
765    assertEquals(0, memstore.getSnapshot().getCellsCount());
766    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
767    // Out of total 10, only 4 cells are unique
768    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
769    totalCellsLen3 = 0;// All duplicated cells.
770    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
771      regionServicesForStores.getMemStoreSize());
772    // Only 4 unique cells left
773    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
774      + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
775
776    mss = memstore.getFlushableSize();
777    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
778    // simulate flusher
779    region.decrMemStoreSize(mss);
780    ImmutableSegment s = memstore.getSnapshot();
781    assertEquals(4, s.getCellsCount());
782    assertEquals(0, regionServicesForStores.getMemStoreSize());
783
784    memstore.clearSnapshot(snapshot.getId());
785  }
786
787  @Test
788  public void testMagicCompaction3Buckets() throws IOException {
789
790    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
791    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
792      String.valueOf(compactionType));
793    memstore.getConfiguration()
794      .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
795    memstore.getConfiguration()
796      .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
797    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
798    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
799
800    String[] keys1 = { "A", "B", "D" };
801    String[] keys2 = { "A" };
802    String[] keys3 = { "A", "A", "B", "C" };
803    String[] keys4 = { "D", "B", "B" };
804
805    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
806    int oneCellOnCSLMHeapSize = 120;
807    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
808    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
809    assertEquals(totalHeapSize, memstore.heapSize());
810
811    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten
812    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
813    assertEquals(1.0,
814      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
815    assertEquals(0, memstore.getSnapshot().getCellsCount());
816
817    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
818    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
819    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
820    assertEquals(1.0,
821      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
822    assertEquals(0, memstore.getSnapshot().getCellsCount());
823
824    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
825    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
826    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
827    assertEquals((4.0 / 8.0),
828      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
829    assertEquals(0, memstore.getSnapshot().getCellsCount());
830
831    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
832    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
833    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
834    assertTrue(4 == numCells || 11 == numCells);
835    assertEquals(0, memstore.getSnapshot().getCellsCount());
836
837    MemStoreSize mss = memstore.getFlushableSize();
838    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
839    // simulate flusher
840    region.decrMemStoreSize(mss);
841    ImmutableSegment s = memstore.getSnapshot();
842    numCells = s.getCellsCount();
843    assertTrue(4 == numCells || 11 == numCells);
844    assertEquals(0, regionServicesForStores.getMemStoreSize());
845
846    memstore.clearSnapshot(snapshot.getId());
847  }
848
849  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
850    byte[] fam = Bytes.toBytes("testfamily");
851    byte[] qf = Bytes.toBytes("testqualifier");
852    long size = hmc.getActive().getDataSize();
853    long heapOverhead = hmc.getActive().getHeapSize();
854    int cellsCount = hmc.getActive().getCellsCount();
855    int totalLen = 0;
856    for (int i = 0; i < keys.length; i++) {
857      long timestamp = EnvironmentEdgeManager.currentTime();
858      Threads.sleep(1); // to make sure each kv gets a different ts
859      byte[] row = Bytes.toBytes(keys[i]);
860      byte[] val = Bytes.toBytes(keys[i] + i);
861      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
862      totalLen += Segment.getCellLength(kv);
863      hmc.add(kv, null);
864      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
865    }
866    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
867      hmc.getActive().getHeapSize() - heapOverhead, 0,
868      hmc.getActive().getCellsCount() - cellsCount);
869    return totalLen;
870  }
871
872  // for controlling the val size when adding a new cell
873  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
874    byte[] fam = Bytes.toBytes("testfamily");
875    byte[] qf = Bytes.toBytes("testqualifier");
876    long size = hmc.getActive().getDataSize();
877    long heapOverhead = hmc.getActive().getHeapSize();
878    int cellsCount = hmc.getActive().getCellsCount();
879    int totalLen = 0;
880    for (int i = 0; i < keys.length; i++) {
881      long timestamp = EnvironmentEdgeManager.currentTime();
882      Threads.sleep(1); // to make sure each kv gets a different ts
883      byte[] row = Bytes.toBytes(keys[i]);
884      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
885      totalLen += Segment.getCellLength(kv);
886      hmc.add(kv, null);
887      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
888    }
889    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
890      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
891    return totalLen;
892  }
893
894  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
895    long t = 1234;
896
897    @Override
898    public long currentTime() {
899      return t;
900    }
901
902    public void setCurrentTimeMillis(long t) {
903      this.t = t;
904    }
905  }
906
907  static protected class MyCompactingMemStore extends CompactingMemStore {
908
909    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
910      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
911      throws IOException {
912      super(conf, c, store, regionServices, compactionPolicy);
913    }
914
915    void disableCompaction() {
916      allowCompaction.set(false);
917    }
918
919    void enableCompaction() {
920      allowCompaction.set(true);
921    }
922
923    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
924      throws IllegalArgumentIOException {
925      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
926    }
927
928  }
929}