001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeepDeletedCells;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueTestUtil;
044import org.apache.hadoop.hbase.MemoryCompactionPolicy;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
054import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdge;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.junit.After;
063import org.junit.Before;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.Mockito;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * compacted memstore test case
073 */
074@Category({ RegionServerTests.class, MediumTests.class })
075public class TestCompactingMemStore extends TestDefaultMemStore {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestCompactingMemStore.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
082  protected static ChunkCreator chunkCreator;
083  protected HRegion region;
084  protected RegionServicesForStores regionServicesForStores;
085  protected HStore store;
086
087  //////////////////////////////////////////////////////////////////////////////
088  // Helpers
089  //////////////////////////////////////////////////////////////////////////////
090  protected static byte[] makeQualifier(final int i1, final int i2) {
091    return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2));
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    chunkCreator.clearChunksInPool();
097  }
098
099  @Override
100  @Before
101  public void setUp() throws Exception {
102    compactingSetUp();
103    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(),
104      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
105    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
106  }
107
108  protected void compactingSetUp() throws Exception {
109    super.internalSetUp();
110    Configuration conf = new Configuration();
111    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
112    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
113    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
114    HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf);
115    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY);
116    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar"))
117      .setColumnFamily(familyDescriptor).build();
118    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build();
119    WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info);
120    this.region =
121      HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true);
122    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
123    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
124    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
125    this.store = new HStore(region, familyDescriptor, conf, false);
126
127    long globalMemStoreLimit =
128      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
129        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
130    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
131      globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
132      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
133    assertNotNull(chunkCreator);
134  }
135
136  /**
137   * A simple test which flush in memory affect timeOfOldestEdit
138   */
139  @Test
140  public void testTimeOfOldestEdit() {
141    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
142    final byte[] r = Bytes.toBytes("r");
143    final byte[] f = Bytes.toBytes("f");
144    final byte[] q = Bytes.toBytes("q");
145    final byte[] v = Bytes.toBytes("v");
146    final KeyValue kv = new KeyValue(r, f, q, v);
147    memstore.add(kv, null);
148    long timeOfOldestEdit = memstore.timeOfOldestEdit();
149    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
150
151    ((CompactingMemStore) memstore).flushInMemory();
152    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
153    memstore.add(kv, null);
154    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
155    memstore.snapshot();
156    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
157  }
158
159  /**
160   * A simple test which verifies the 3 possible states when scanning across snapshot.
161   */
162  @Override
163  @Test
164  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
165    // we are going to the scanning across snapshot with two kvs
166    // kv1 should always be returned before kv2
167    final byte[] one = Bytes.toBytes(1);
168    final byte[] two = Bytes.toBytes(2);
169    final byte[] f = Bytes.toBytes("f");
170    final byte[] q = Bytes.toBytes("q");
171    final byte[] v = Bytes.toBytes(3);
172
173    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
174    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
175
176    // use case 1: both kvs in kvset
177    this.memstore.add(kv1.clone(), null);
178    this.memstore.add(kv2.clone(), null);
179    // snapshot is empty,active segment is not empty,
180    // empty segment is skipped.
181    verifyOneScanAcrossSnapshot2(kv1, kv2);
182
183    // use case 2: both kvs in snapshot
184    this.memstore.snapshot();
185    // active segment is empty,snapshot is not empty,
186    // empty segment is skipped.
187    verifyOneScanAcrossSnapshot2(kv1, kv2);
188
189    // use case 3: first in snapshot second in kvset
190    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
191      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
192    this.memstore.add(kv1.clone(), null);
193    // As compaction is starting in the background the repetition
194    // of the k1 might be removed BUT the scanners created earlier
195    // should look on the OLD MutableCellSetSegment, so this should be OK...
196    this.memstore.snapshot();
197    this.memstore.add(kv2.clone(), null);
198    verifyScanAcrossSnapshot2(kv1, kv2);
199  }
200
201  /**
202   * Test memstore snapshots
203   */
204  @Override
205  @Test
206  public void testSnapshotting() throws IOException {
207    final int snapshotCount = 5;
208    // Add some rows, run a snapshot. Do it a few times.
209    for (int i = 0; i < snapshotCount; i++) {
210      addRows(this.memstore);
211      runSnapshot(this.memstore, true);
212      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
213    }
214  }
215
216  //////////////////////////////////////////////////////////////////////////////
217  // Get tests
218  //////////////////////////////////////////////////////////////////////////////
219
220  /**
221   * Test getNextRow from memstore
222   */
223  @Override
224  @Test
225  public void testGetNextRow() throws Exception {
226    addRows(this.memstore);
227    // Add more versions to make it a little more interesting.
228    Thread.sleep(1);
229    addRows(this.memstore);
230    Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
231    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
232      new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0);
233    for (int i = 0; i < ROW_COUNT; i++) {
234      Cell nr = ((CompactingMemStore) this.memstore)
235        .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime()));
236      if (i + 1 == ROW_COUNT) {
237        assertNull(nr);
238      } else {
239        assertTrue(CellComparator.getInstance().compareRows(nr,
240          new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0);
241      }
242    }
243    // starting from each row, validate results should contain the starting row
244    Configuration conf = HBaseConfiguration.create();
245    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
246      ScanInfo scanInfo =
247        new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
248          HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
249      try (InternalScanner scanner =
250        new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
251          memstore.getScanners(0))) {
252        List<Cell> results = new ArrayList<>();
253        for (int i = 0; scanner.next(results); i++) {
254          int rowId = startRowId + i;
255          Cell left = results.get(0);
256          byte[] row1 = Bytes.toBytes(rowId);
257          assertTrue("Row name",
258            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
259          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
260          List<Cell> row = new ArrayList<>();
261          for (Cell kv : results) {
262            row.add(kv);
263          }
264          isExpectedRowWithoutTimestamps(rowId, row);
265          // Clear out set. Otherwise row results accumulate.
266          results.clear();
267        }
268      }
269    }
270  }
271
272  @Override
273  @Test
274  public void testGet_memstoreAndSnapShot() throws IOException {
275    byte[] row = Bytes.toBytes("testrow");
276    byte[] fam = Bytes.toBytes("testfamily");
277    byte[] qf1 = Bytes.toBytes("testqualifier1");
278    byte[] qf2 = Bytes.toBytes("testqualifier2");
279    byte[] qf3 = Bytes.toBytes("testqualifier3");
280    byte[] qf4 = Bytes.toBytes("testqualifier4");
281    byte[] qf5 = Bytes.toBytes("testqualifier5");
282    byte[] val = Bytes.toBytes("testval");
283
284    // Setting up memstore
285    memstore.add(new KeyValue(row, fam, qf1, val), null);
286    memstore.add(new KeyValue(row, fam, qf2, val), null);
287    memstore.add(new KeyValue(row, fam, qf3, val), null);
288    // Pushing to pipeline
289    ((CompactingMemStore) memstore).flushInMemory();
290    assertEquals(0, memstore.getSnapshot().getCellsCount());
291    // Creating a snapshot
292    memstore.snapshot();
293    assertEquals(3, memstore.getSnapshot().getCellsCount());
294    // Adding value to "new" memstore
295    assertEquals(0, memstore.getActive().getCellsCount());
296    memstore.add(new KeyValue(row, fam, qf4, val), null);
297    memstore.add(new KeyValue(row, fam, qf5, val), null);
298    assertEquals(2, memstore.getActive().getCellsCount());
299  }
300
301  ////////////////////////////////////
302  // Test for periodic memstore flushes
303  // based on time of oldest edit
304  ////////////////////////////////////
305
306  /**
307   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older
308   * keyvalues are deleted from the memstore.
309   */
310  @Override
311  @Test
312  public void testUpsertMemstoreSize() throws Exception {
313    MemStoreSize oldSize = memstore.size();
314
315    List<ExtendedCell> l = new ArrayList<>();
316    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
317    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
318    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
319
320    kv1.setSequenceId(1);
321    kv2.setSequenceId(1);
322    kv3.setSequenceId(1);
323    l.add(kv1);
324    l.add(kv2);
325    l.add(kv3);
326
327    this.memstore.upsert(l, 2, null);// readpoint is 2
328    MemStoreSize newSize = this.memstore.size();
329    assert (newSize.getDataSize() > oldSize.getDataSize());
330    // The kv1 should be removed.
331    assert (memstore.getActive().getCellsCount() == 2);
332
333    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
334    kv4.setSequenceId(1);
335    l.clear();
336    l.add(kv4);
337    this.memstore.upsert(l, 3, null);
338    assertEquals(newSize, this.memstore.size());
339    // The kv2 should be removed.
340    assert (memstore.getActive().getCellsCount() == 2);
341    // this.memstore = null;
342  }
343
344  /**
345   * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in
346   * memstore.
347   */
348  @Override
349  @Test
350  public void testUpdateToTimeOfOldestEdit() throws Exception {
351    try {
352      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
353      EnvironmentEdgeManager.injectEdge(edge);
354      long t = memstore.timeOfOldestEdit();
355      assertEquals(Long.MAX_VALUE, t);
356
357      // test the case that the timeOfOldestEdit is updated after a KV add
358      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
359      t = memstore.timeOfOldestEdit();
360      assertTrue(t == 1234);
361      // The method will also assert
362      // the value is reset to Long.MAX_VALUE
363      t = runSnapshot(memstore, true);
364
365      // test the case that the timeOfOldestEdit is updated after a KV delete
366      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
367      t = memstore.timeOfOldestEdit();
368      assertTrue(t == 1234);
369      t = runSnapshot(memstore, true);
370
371      // test the case that the timeOfOldestEdit is updated after a KV upsert
372      List<ExtendedCell> l = new ArrayList<>();
373      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
374      kv1.setSequenceId(100);
375      l.add(kv1);
376      memstore.upsert(l, 1000, null);
377      t = memstore.timeOfOldestEdit();
378      assertTrue(t == 1234);
379    } finally {
380      EnvironmentEdgeManager.reset();
381    }
382  }
383
384  private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException {
385    // Save off old state.
386    long oldHistorySize = hmc.getSnapshot().getDataSize();
387    long prevTimeStamp = hmc.timeOfOldestEdit();
388
389    hmc.snapshot();
390    MemStoreSnapshot snapshot = hmc.snapshot();
391    if (useForce) {
392      // Make some assertions about what just happened.
393      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
394      long t = hmc.timeOfOldestEdit();
395      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
396      hmc.clearSnapshot(snapshot.getId());
397    } else {
398      long t = hmc.timeOfOldestEdit();
399      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
400    }
401    return prevTimeStamp;
402  }
403
404  private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) {
405    int i = 0;
406    for (Cell kv : kvs) {
407      byte[] expectedColname = makeQualifier(rowIndex, i++);
408      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
409      // Value is column name as bytes. Usually result is
410      // 100 bytes in size at least. This is the default size
411      // for BytesWriteable. For comparison, convert bytes to
412      // String and trim to remove trailing null bytes.
413      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
414    }
415  }
416
417  @Test
418  public void testPuttingBackChunksAfterFlushing() throws IOException {
419    byte[] row = Bytes.toBytes("testrow");
420    byte[] fam = Bytes.toBytes("testfamily");
421    byte[] qf1 = Bytes.toBytes("testqualifier1");
422    byte[] qf2 = Bytes.toBytes("testqualifier2");
423    byte[] qf3 = Bytes.toBytes("testqualifier3");
424    byte[] qf4 = Bytes.toBytes("testqualifier4");
425    byte[] qf5 = Bytes.toBytes("testqualifier5");
426    byte[] val = Bytes.toBytes("testval");
427
428    // Setting up memstore
429    memstore.add(new KeyValue(row, fam, qf1, val), null);
430    memstore.add(new KeyValue(row, fam, qf2, val), null);
431    memstore.add(new KeyValue(row, fam, qf3, val), null);
432
433    // Creating a snapshot
434    MemStoreSnapshot snapshot = memstore.snapshot();
435    assertEquals(3, memstore.getSnapshot().getCellsCount());
436
437    // Adding value to "new" memstore
438    assertEquals(0, memstore.getActive().getCellsCount());
439    memstore.add(new KeyValue(row, fam, qf4, val), null);
440    memstore.add(new KeyValue(row, fam, qf5, val), null);
441    assertEquals(2, memstore.getActive().getCellsCount());
442    // close the scanners
443    for (KeyValueScanner scanner : snapshot.getScanners()) {
444      scanner.close();
445    }
446    memstore.clearSnapshot(snapshot.getId());
447
448    int chunkCount = chunkCreator.getPoolSize();
449    assertTrue(chunkCount > 0);
450
451  }
452
453  @Test
454  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
455    byte[] row = Bytes.toBytes("testrow");
456    byte[] fam = Bytes.toBytes("testfamily");
457    byte[] qf1 = Bytes.toBytes("testqualifier1");
458    byte[] qf2 = Bytes.toBytes("testqualifier2");
459    byte[] qf3 = Bytes.toBytes("testqualifier3");
460    byte[] qf4 = Bytes.toBytes("testqualifier4");
461    byte[] qf5 = Bytes.toBytes("testqualifier5");
462    byte[] qf6 = Bytes.toBytes("testqualifier6");
463    byte[] qf7 = Bytes.toBytes("testqualifier7");
464    byte[] val = Bytes.toBytes("testval");
465
466    // Setting up memstore
467    memstore.add(new KeyValue(row, fam, qf1, val), null);
468    memstore.add(new KeyValue(row, fam, qf2, val), null);
469    memstore.add(new KeyValue(row, fam, qf3, val), null);
470
471    // Creating a snapshot
472    MemStoreSnapshot snapshot = memstore.snapshot();
473    assertEquals(3, memstore.getSnapshot().getCellsCount());
474
475    // Adding value to "new" memstore
476    assertEquals(0, memstore.getActive().getCellsCount());
477    memstore.add(new KeyValue(row, fam, qf4, val), null);
478    memstore.add(new KeyValue(row, fam, qf5, val), null);
479    assertEquals(2, memstore.getActive().getCellsCount());
480
481    // opening scanner before clear the snapshot
482    List<KeyValueScanner> scanners = memstore.getScanners(0);
483    // Shouldn't putting back the chunks to pool,since some scanners are opening
484    // based on their data
485    // close the scanners
486    for (KeyValueScanner scanner : snapshot.getScanners()) {
487      scanner.close();
488    }
489    memstore.clearSnapshot(snapshot.getId());
490
491    assertTrue(chunkCreator.getPoolSize() == 0);
492
493    // Chunks will be put back to pool after close scanners;
494    for (KeyValueScanner scanner : scanners) {
495      scanner.close();
496    }
497    assertTrue(chunkCreator.getPoolSize() > 0);
498
499    // clear chunks
500    chunkCreator.clearChunksInPool();
501
502    // Creating another snapshot
503
504    snapshot = memstore.snapshot();
505    // Adding more value
506    memstore.add(new KeyValue(row, fam, qf6, val), null);
507    memstore.add(new KeyValue(row, fam, qf7, val), null);
508    // opening scanners
509    scanners = memstore.getScanners(0);
510    // close scanners before clear the snapshot
511    for (KeyValueScanner scanner : scanners) {
512      scanner.close();
513    }
514    // Since no opening scanner, the chunks of snapshot should be put back to
515    // pool
516    // close the scanners
517    for (KeyValueScanner scanner : snapshot.getScanners()) {
518      scanner.close();
519    }
520    memstore.clearSnapshot(snapshot.getId());
521    assertTrue(chunkCreator.getPoolSize() > 0);
522  }
523
524  @Test
525  public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException {
526
527    // set memstore to do data compaction and not to use the speculative scan
528    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
529    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
530      String.valueOf(compactionType));
531    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
532
533    byte[] row = Bytes.toBytes("testrow");
534    byte[] fam = Bytes.toBytes("testfamily");
535    byte[] qf1 = Bytes.toBytes("testqualifier1");
536    byte[] qf2 = Bytes.toBytes("testqualifier2");
537    byte[] qf3 = Bytes.toBytes("testqualifier3");
538    byte[] val = Bytes.toBytes("testval");
539
540    // Setting up memstore
541    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
542    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
543    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
544
545    // Creating a pipeline
546    ((MyCompactingMemStore) memstore).disableCompaction();
547    ((CompactingMemStore) memstore).flushInMemory();
548
549    // Adding value to "new" memstore
550    assertEquals(0, memstore.getActive().getCellsCount());
551    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
552    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
553    assertEquals(2, memstore.getActive().getCellsCount());
554
555    // pipeline bucket 2
556    ((CompactingMemStore) memstore).flushInMemory();
557    // opening scanner before force flushing
558    List<KeyValueScanner> scanners = memstore.getScanners(0);
559    // Shouldn't putting back the chunks to pool,since some scanners are opening
560    // based on their data
561    ((MyCompactingMemStore) memstore).enableCompaction();
562    // trigger compaction
563    ((CompactingMemStore) memstore).flushInMemory();
564
565    // Adding value to "new" memstore
566    assertEquals(0, memstore.getActive().getCellsCount());
567    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
568    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
569    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
570    assertEquals(3, memstore.getActive().getCellsCount());
571
572    assertTrue(chunkCreator.getPoolSize() == 0);
573
574    // Chunks will be put back to pool after close scanners;
575    for (KeyValueScanner scanner : scanners) {
576      scanner.close();
577    }
578    assertTrue(chunkCreator.getPoolSize() > 0);
579
580    // clear chunks
581    chunkCreator.clearChunksInPool();
582
583    // Creating another snapshot
584
585    MemStoreSnapshot snapshot = memstore.snapshot();
586    // close the scanners
587    for (KeyValueScanner scanner : snapshot.getScanners()) {
588      scanner.close();
589    }
590    memstore.clearSnapshot(snapshot.getId());
591
592    snapshot = memstore.snapshot();
593    // Adding more value
594    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
595    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
596    // opening scanners
597    scanners = memstore.getScanners(0);
598    // close scanners before clear the snapshot
599    for (KeyValueScanner scanner : scanners) {
600      scanner.close();
601    }
602    // Since no opening scanner, the chunks of snapshot should be put back to
603    // pool
604    // close the scanners
605    for (KeyValueScanner scanner : snapshot.getScanners()) {
606      scanner.close();
607    }
608    memstore.clearSnapshot(snapshot.getId());
609    assertTrue(chunkCreator.getPoolSize() > 0);
610  }
611
612  //////////////////////////////////////////////////////////////////////////////
613  // Compaction tests
614  //////////////////////////////////////////////////////////////////////////////
615  @Test
616  public void testCompaction1Bucket() throws IOException {
617
618    // set memstore to do basic structure flattening, the "eager" option is tested in
619    // TestCompactingToCellFlatMapMemStore
620    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
621    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
622      String.valueOf(compactionType));
623    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
624
625    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
626
627    // test 1 bucket
628    int totalCellsLen = addRowsByKeys(memstore, keys1);
629    int oneCellOnCSLMHeapSize = 120;
630    int oneCellOnCAHeapSize = 88;
631    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
632    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
633    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
634
635    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
636    assertEquals(0, memstore.getSnapshot().getCellsCount());
637    // There is no compaction, as the compacting memstore type is basic.
638    // totalCellsLen remains the same
639    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
640      + 4 * oneCellOnCAHeapSize;
641    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
642    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
643
644    MemStoreSize mss = memstore.getFlushableSize();
645    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
646    // simulate flusher
647    region.decrMemStoreSize(mss);
648    ImmutableSegment s = memstore.getSnapshot();
649    assertEquals(4, s.getCellsCount());
650    assertEquals(0, regionServicesForStores.getMemStoreSize());
651
652    memstore.clearSnapshot(snapshot.getId());
653  }
654
655  @Test
656  public void testCompaction2Buckets() throws IOException {
657
658    // set memstore to do basic structure flattening, the "eager" option is tested in
659    // TestCompactingToCellFlatMapMemStore
660    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
661    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
662      String.valueOf(compactionType));
663    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
664      String.valueOf(1));
665    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
666    String[] keys1 = { "A", "A", "B", "C" };
667    String[] keys2 = { "A", "B", "D" };
668
669    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
670    int oneCellOnCSLMHeapSize = 120;
671    int oneCellOnCAHeapSize = 88;
672    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
673
674    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
675    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
676
677    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
678    int counter = 0;
679    for (Segment s : memstore.getSegments()) {
680      counter += s.getCellsCount();
681    }
682    assertEquals(4, counter);
683    assertEquals(0, memstore.getSnapshot().getCellsCount());
684    // There is no compaction, as the compacting memstore type is basic.
685    // totalCellsLen remains the same
686    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
687    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
688      + 4 * oneCellOnCAHeapSize;
689    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
690
691    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
692    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
693    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
694    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
695
696    MemStoreSize mss = memstore.getFlushableSize();
697    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
698    assertEquals(0, memstore.getSnapshot().getCellsCount());
699    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
700    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
701      + 7 * oneCellOnCAHeapSize;
702    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
703
704    mss = memstore.getFlushableSize();
705    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
706    // simulate flusher
707    region.decrMemStoreSize(mss);
708    ImmutableSegment s = memstore.getSnapshot();
709    assertEquals(7, s.getCellsCount());
710    assertEquals(0, regionServicesForStores.getMemStoreSize());
711
712    memstore.clearSnapshot(snapshot.getId());
713  }
714
715  @Test
716  public void testCompaction3Buckets() throws IOException {
717
718    // set memstore to do data compaction and not to use the speculative scan
719    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
720    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
721      String.valueOf(compactionType));
722    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
723    String[] keys1 = { "A", "A", "B", "C" };
724    String[] keys2 = { "A", "B", "D" };
725    String[] keys3 = { "D", "B", "B" };
726
727    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
728    int oneCellOnCSLMHeapSize = 120;
729    int oneCellOnCAHeapSize = 88;
730    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
731    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
732    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
733    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
734
735    assertEquals(0, memstore.getSnapshot().getCellsCount());
736    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
737    // totalCellsLen
738    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
739    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
740    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
741    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
742      + 3 * oneCellOnCAHeapSize;
743    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
744
745    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
746    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
747
748    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
749    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
750
751    ((MyCompactingMemStore) memstore).disableCompaction();
752    MemStoreSize mss = memstore.getFlushableSize();
753    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
754    assertEquals(0, memstore.getSnapshot().getCellsCount());
755    // No change in the cells data size. ie. memstore size. as there is no compaction.
756    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
757    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
758      ((CompactingMemStore) memstore).heapSize());
759
760    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
761    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
762      regionServicesForStores.getMemStoreSize());
763    long totalHeapSize3 =
764      totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize;
765    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
766
767    ((MyCompactingMemStore) memstore).enableCompaction();
768    mss = memstore.getFlushableSize();
769    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
770    assertEquals(0, memstore.getSnapshot().getCellsCount());
771    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
772    // Out of total 10, only 4 cells are unique
773    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
774    totalCellsLen3 = 0;// All duplicated cells.
775    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
776      regionServicesForStores.getMemStoreSize());
777    // Only 4 unique cells left
778    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
779      + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
780
781    mss = memstore.getFlushableSize();
782    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
783    // simulate flusher
784    region.decrMemStoreSize(mss);
785    ImmutableSegment s = memstore.getSnapshot();
786    assertEquals(4, s.getCellsCount());
787    assertEquals(0, regionServicesForStores.getMemStoreSize());
788
789    memstore.clearSnapshot(snapshot.getId());
790  }
791
792  @Test
793  public void testMagicCompaction3Buckets() throws IOException {
794
795    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
796    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
797      String.valueOf(compactionType));
798    memstore.getConfiguration()
799      .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
800    memstore.getConfiguration()
801      .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
802    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
803    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
804
805    String[] keys1 = { "A", "B", "D" };
806    String[] keys2 = { "A" };
807    String[] keys3 = { "A", "A", "B", "C" };
808    String[] keys4 = { "D", "B", "B" };
809
810    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
811    int oneCellOnCSLMHeapSize = 120;
812    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
813    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
814    assertEquals(totalHeapSize, memstore.heapSize());
815
816    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten
817    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
818    assertEquals(1.0,
819      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
820    assertEquals(0, memstore.getSnapshot().getCellsCount());
821
822    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
823    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
824    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
825    assertEquals(1.0,
826      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
827    assertEquals(0, memstore.getSnapshot().getCellsCount());
828
829    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
830    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
831    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
832    assertEquals((4.0 / 8.0),
833      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
834    assertEquals(0, memstore.getSnapshot().getCellsCount());
835
836    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
837    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
838    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
839    assertTrue(4 == numCells || 11 == numCells);
840    assertEquals(0, memstore.getSnapshot().getCellsCount());
841
842    MemStoreSize mss = memstore.getFlushableSize();
843    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
844    // simulate flusher
845    region.decrMemStoreSize(mss);
846    ImmutableSegment s = memstore.getSnapshot();
847    numCells = s.getCellsCount();
848    assertTrue(4 == numCells || 11 == numCells);
849    assertEquals(0, regionServicesForStores.getMemStoreSize());
850
851    memstore.clearSnapshot(snapshot.getId());
852  }
853
854  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
855    byte[] fam = Bytes.toBytes("testfamily");
856    byte[] qf = Bytes.toBytes("testqualifier");
857    long size = hmc.getActive().getDataSize();
858    long heapOverhead = hmc.getActive().getHeapSize();
859    int cellsCount = hmc.getActive().getCellsCount();
860    int totalLen = 0;
861    for (int i = 0; i < keys.length; i++) {
862      long timestamp = EnvironmentEdgeManager.currentTime();
863      Threads.sleep(1); // to make sure each kv gets a different ts
864      byte[] row = Bytes.toBytes(keys[i]);
865      byte[] val = Bytes.toBytes(keys[i] + i);
866      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
867      totalLen += Segment.getCellLength(kv);
868      hmc.add(kv, null);
869      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
870    }
871    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
872      hmc.getActive().getHeapSize() - heapOverhead, 0,
873      hmc.getActive().getCellsCount() - cellsCount);
874    return totalLen;
875  }
876
877  // for controlling the val size when adding a new cell
878  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
879    byte[] fam = Bytes.toBytes("testfamily");
880    byte[] qf = Bytes.toBytes("testqualifier");
881    long size = hmc.getActive().getDataSize();
882    long heapOverhead = hmc.getActive().getHeapSize();
883    int cellsCount = hmc.getActive().getCellsCount();
884    int totalLen = 0;
885    for (int i = 0; i < keys.length; i++) {
886      long timestamp = EnvironmentEdgeManager.currentTime();
887      Threads.sleep(1); // to make sure each kv gets a different ts
888      byte[] row = Bytes.toBytes(keys[i]);
889      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
890      totalLen += Segment.getCellLength(kv);
891      hmc.add(kv, null);
892      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
893    }
894    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
895      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
896    return totalLen;
897  }
898
899  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
900    long t = 1234;
901
902    @Override
903    public long currentTime() {
904      return t;
905    }
906  }
907
908  static protected class MyCompactingMemStore extends CompactingMemStore {
909
910    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
911      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
912      throws IOException {
913      super(conf, c, store, regionServices, compactionPolicy);
914    }
915
916    void disableCompaction() {
917      allowCompaction.set(false);
918    }
919
920    void enableCompaction() {
921      allowCompaction.set(true);
922    }
923
924    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
925      throws IllegalArgumentIOException {
926      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
927    }
928
929  }
930}