001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.RegionServerTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.mockito.Mockito;
050
051/**
052 * This test verifies the correctness of the Per Column Family flushing strategy when part of the
053 * memstores are compacted memstores
054 */
055@Category({ RegionServerTests.class, LargeTests.class })
056public class TestWalAndCompactingMemStoreFlush {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class);
061
062  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
063  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
064  public static final TableName TABLENAME =
065    TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
066
067  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
068    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
069
070  public static final byte[] FAMILY1 = FAMILIES[0];
071  public static final byte[] FAMILY2 = FAMILIES[1];
072  public static final byte[] FAMILY3 = FAMILIES[2];
073
074  private Configuration conf;
075
076  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
077    int i = 0;
078    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
079    for (byte[] family : FAMILIES) {
080      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
081      // even column families are going to have compacted memstore
082      if (i % 2 == 0) {
083        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy
084          .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
085      } else {
086        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
087      }
088      builder.setColumnFamily(cfBuilder.build());
089      i++;
090    }
091
092    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
093    Path path = new Path(DIR, callingMethod);
094    HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false);
095    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
096    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
097    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
098    region.initialize(null);
099    return region;
100  }
101
102  // A helper function to create puts.
103  private Put createPut(int familyNum, int putNum) {
104    byte[] qf = Bytes.toBytes("q" + familyNum);
105    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
106    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
107    Put p = new Put(row);
108    p.addColumn(FAMILIES[familyNum - 1], qf, val);
109    return p;
110  }
111
112  // A helper function to create double puts, so something can be compacted later.
113  private Put createDoublePut(int familyNum, int putNum) {
114    byte[] qf = Bytes.toBytes("q" + familyNum);
115    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
116    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
117    Put p = new Put(row);
118    // add twice with different timestamps
119    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
120    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
121    return p;
122  }
123
124  private void verifyInMemoryFlushSize(Region region) {
125    assertEquals(
126      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
127      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
128  }
129
130  @Before
131  public void setup() {
132    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
133    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
134      FlushNonSloppyStoresFirstPolicy.class.getName());
135    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
136  }
137
138  @Test
139  public void testSelectiveFlushWithEager() throws IOException {
140    // Set up the configuration
141    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
142    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
143    // set memstore to do data compaction
144    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
145      String.valueOf(MemoryCompactionPolicy.EAGER));
146
147    // Intialize the region
148    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
149    verifyInMemoryFlushSize(region);
150    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
151    for (int i = 1; i <= 1200; i++) {
152      region.put(createPut(1, i)); // compacted memstore, all the keys are unique
153
154      if (i <= 100) {
155        region.put(createPut(2, i));
156        if (i <= 50) {
157          // compacted memstore, subject for compaction due to duplications
158          region.put(createDoublePut(3, i));
159        }
160      }
161    }
162
163    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
164    for (int i = 100; i < 2000; i++) {
165      region.put(createPut(2, i));
166    }
167
168    long totalMemstoreSize = region.getMemStoreDataSize();
169
170    // Find the smallest LSNs for edits wrt to each CF.
171    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
172    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
173    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
174
175    // Find the sizes of the memstores of each CF.
176    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
177    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
178    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
179
180    // Get the overall smallest LSN in the region's memstores.
181    long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
182      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
183
184    String s = "\n\n----------------------------------\n"
185      + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI
186      + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore()
187      + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
188      + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
189      + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
190
191    // The overall smallest LSN in the region's memstores should be the same as
192    // the LSN of the smallest edit in CF1
193    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
194
195    // Some other sanity checks.
196    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
197    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
198    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
199    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
200    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
201
202    // The total memstore size should be the same as the sum of the sizes of
203    // memstores of CF1, CF2 and CF3.
204    String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI="
205      + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI
206      + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
207    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
208      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
209
210    // Flush!!!!!!!!!!!!!!!!!!!!!!
211    // We have big compacting memstore CF1 and two small memstores:
212    // CF2 (not compacted) and CF3 (compacting)
213    // All together they are above the flush size lower bound.
214    // Since CF1 and CF3 should be flushed to memory (not to disk),
215    // CF2 is going to be flushed to disk.
216    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
217    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
218    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
219    cms1.flushInMemory();
220    cms3.flushInMemory();
221    region.flush(false);
222
223    // Recalculate everything
224    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
225    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
226    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
227
228    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
229      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
230    // Find the smallest LSNs for edits wrt to each CF.
231    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
232    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
233    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
234
235    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
236      + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
237      + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
238
239    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
240    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
241    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
242
243    // CF2 should become empty
244    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
245    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
246
247    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
248    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
249    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
250
251    // Now the smallest LSN in the region should be the same as the smallest
252    // LSN in the memstore of CF1.
253    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
254
255    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
256    // memory in next flush
257    for (int i = 1200; i < 3000; i++) {
258      region.put(createPut(1, i));
259    }
260
261    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
262      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", "
263      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
264      + smallestSeqCF3PhaseII + "\n";
265
266    // How much does the CF1 memstore occupy? Will be used later.
267    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
268    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
269
270    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
271      + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n";
272
273    // Flush!!!!!!!!!!!!!!!!!!!!!!
274    // Flush again, CF1 is flushed to disk
275    // CF2 is flushed to disk, because it is not in-memory compacted memstore
276    // CF3 is flushed empty to memory (actually nothing happens to CF3)
277    region.flush(false);
278
279    // Recalculate everything
280    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
281    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
282    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
283
284    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
285      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
286    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
287    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
288    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
289
290    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
291      + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n";
292
293    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
294      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
295      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
296      + smallestSeqCF3PhaseIV + "\n";
297
298    // CF1's pipeline component (inserted before first flush) should be flushed to disk
299    // CF2 should be flushed to disk
300    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
301    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
302    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
303
304    // CF3 shouldn't have been touched.
305    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
306
307    // the smallest LSN of CF3 shouldn't change
308    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
309
310    // CF3 should be bottleneck for WAL
311    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
312
313    // Flush!!!!!!!!!!!!!!!!!!!!!!
314    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
315    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
316    region.flush(true);
317
318    // Recalculate everything
319    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
320    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
321    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
322    long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
323      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
324
325    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
326    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
327    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
328    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
329    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
330    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
331
332    // What happens when we hit the memstore limit, but we are not able to find
333    // any Column Family above the threshold?
334    // In that case, we should flush all the CFs.
335
336    // The memstore limit is 100*1024 and the column family flush threshold is
337    // around 25*1024. We try to just hit the memstore limit with each CF's
338    // memstore being below the CF flush threshold.
339    for (int i = 1; i <= 300; i++) {
340      region.put(createPut(1, i));
341      region.put(createPut(2, i));
342      region.put(createPut(3, i));
343      region.put(createPut(4, i));
344      region.put(createPut(5, i));
345    }
346
347    region.flush(false);
348
349    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
350      + smallestSeqInRegionCurrentMemstorePhaseV
351      + ". After additional inserts and last flush, the entire region size is:"
352      + region.getMemStoreDataSize() + "\n----------------------------------\n";
353
354    // Since we won't find any CF above the threshold, and hence no specific
355    // store to flush, we should flush all the memstores
356    // Also compacted memstores are flushed to disk.
357    assertEquals(0, region.getMemStoreDataSize());
358    System.out.println(s);
359    HBaseTestingUtil.closeRegionAndWAL(region);
360  }
361
362  /*------------------------------------------------------------------------------*/
363  /* Check the same as above but for index-compaction type of compacting memstore */
364  @Test
365  public void testSelectiveFlushWithIndexCompaction() throws IOException {
366    /*------------------------------------------------------------------------------*/
367    /* SETUP */
368    // Set up the configuration
369    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
370    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
371    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
372    // set memstore to index-compaction
373    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
374      String.valueOf(MemoryCompactionPolicy.BASIC));
375
376    // Initialize the region
377    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
378    verifyInMemoryFlushSize(region);
379    /*------------------------------------------------------------------------------*/
380    /* PHASE I - insertions */
381    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
382    for (int i = 1; i <= 1200; i++) {
383      region.put(createPut(1, i)); // compacted memstore
384      if (i <= 100) {
385        region.put(createPut(2, i));
386        if (i <= 50) {
387          region.put(createDoublePut(3, i)); // subject for in-memory compaction
388        }
389      }
390    }
391    // Now add more puts for CF2, so that we only flush CF2 to disk
392    for (int i = 100; i < 2000; i++) {
393      region.put(createPut(2, i));
394    }
395
396    /*------------------------------------------------------------------------------*/
397    /*------------------------------------------------------------------------------*/
398    /* PHASE I - collect sizes */
399    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
400    // Find the smallest LSNs for edits wrt to each CF.
401    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
402    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
403    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
404    // Find the sizes of the memstores of each CF.
405    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
406    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
407    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
408    // Get the overall smallest LSN in the region's memstores.
409    long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
410      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
411
412    /*------------------------------------------------------------------------------*/
413    /* PHASE I - validation */
414    // The overall smallest LSN in the region's memstores should be the same as
415    // the LSN of the smallest edit in CF1
416    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
417    // Some other sanity checks.
418    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
419    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
420    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
421    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
422    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
423
424    // The total memstore size should be the same as the sum of the sizes of
425    // memstores of CF1, CF2 and CF3.
426    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
427      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
428
429    /*------------------------------------------------------------------------------*/
430    /* PHASE I - Flush */
431    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
432    // CF1, CF2, CF3, all together they are above the flush size lower bound.
433    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
434    // CF1 and CF3 - flushed to memory and flatten explicitly
435    region.flush(false);
436    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
437    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
438    cms1.flushInMemory();
439    cms3.flushInMemory();
440
441    // CF3/CF1 should be merged so wait here to be sure the compaction is done
442    while (
443      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
444        .isMemStoreFlushingInMemory()
445    ) {
446      Threads.sleep(10);
447    }
448    while (
449      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
450        .isMemStoreFlushingInMemory()
451    ) {
452      Threads.sleep(10);
453    }
454
455    /*------------------------------------------------------------------------------*/
456    /*------------------------------------------------------------------------------*/
457    /* PHASE II - collect sizes */
458    // Recalculate everything
459    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
460    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
461    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
462    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
463      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
464    // Find the smallest LSNs for edits wrt to each CF.
465    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
466    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
467
468    /*------------------------------------------------------------------------------*/
469    /* PHASE II - validation */
470    // CF1 was flushed to memory, should be flattened and take less space
471    assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
472    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
473    // CF2 should become empty
474    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
475    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
476    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
477    // if compacted CF# should be at least twice less because its every key was duplicated
478    assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
479    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
480
481    // Now the smallest LSN in the region should be the same as the smallest
482    // LSN in the memstore of CF1.
483    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
484    // The total memstore size should be the same as the sum of the sizes of
485    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
486    // items in CF1/2
487    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
488      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
489
490    /*------------------------------------------------------------------------------*/
491    /*------------------------------------------------------------------------------*/
492    /* PHASE III - insertions */
493    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
494    // memory in next flush. This is causing the CF! to be flushed to memory twice.
495    for (int i = 1200; i < 8000; i++) {
496      region.put(createPut(1, i));
497    }
498
499    // CF1 should be flatten and merged so wait here to be sure the compaction is done
500    while (
501      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
502        .isMemStoreFlushingInMemory()
503    ) {
504      Threads.sleep(10);
505    }
506
507    /*------------------------------------------------------------------------------*/
508    /* PHASE III - collect sizes */
509    // How much does the CF1 memstore occupy now? Will be used later.
510    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
511    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
512
513    /*------------------------------------------------------------------------------*/
514    /* PHASE III - validation */
515    // The total memstore size should be the same as the sum of the sizes of
516    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
517    // items in CF1/2
518    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
519      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
520
521    /*------------------------------------------------------------------------------*/
522    /* PHASE III - Flush */
523    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
524    // CF1 is flushed to disk, but not entirely emptied.
525    // CF2 was and remained empty, same way nothing happens to CF3
526    region.flush(false);
527
528    /*------------------------------------------------------------------------------*/
529    /*------------------------------------------------------------------------------*/
530    /* PHASE IV - collect sizes */
531    // Recalculate everything
532    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
533    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
534    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
535    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
536      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
537    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
538
539    /*------------------------------------------------------------------------------*/
540    /* PHASE IV - validation */
541    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
542    // CF2 should remain empty
543    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
544    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
545    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
546    // CF3 shouldn't have been touched.
547    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
548    // the smallest LSN of CF3 shouldn't change
549    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
550    // CF3 should be bottleneck for WAL
551    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
552
553    /*------------------------------------------------------------------------------*/
554    /* PHASE IV - Flush */
555    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
556    // Force flush to disk on all memstores (flush parameter true).
557    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
558    region.flush(true);
559
560    /*------------------------------------------------------------------------------*/
561    /*------------------------------------------------------------------------------*/
562    /* PHASE V - collect sizes */
563    // Recalculate everything
564    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
565    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
566    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
567    long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
568      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
569    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
570
571    /*------------------------------------------------------------------------------*/
572    /* PHASE V - validation */
573    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
574    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
575    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
576    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
577    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
578    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
579    // The total memstores size should be empty
580    assertEquals(0, totalMemstoreSizePhaseV);
581    // Because there is nothing in any memstore the WAL's LSN should be -1
582    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
583
584    // What happens when we hit the memstore limit, but we are not able to find
585    // any Column Family above the threshold?
586    // In that case, we should flush all the CFs.
587
588    /*------------------------------------------------------------------------------*/
589    /*------------------------------------------------------------------------------*/
590    /* PHASE VI - insertions */
591    // The memstore limit is 200*1024 and the column family flush threshold is
592    // around 50*1024. We try to just hit the memstore limit with each CF's
593    // memstore being below the CF flush threshold.
594    for (int i = 1; i <= 300; i++) {
595      region.put(createPut(1, i));
596      region.put(createPut(2, i));
597      region.put(createPut(3, i));
598      region.put(createPut(4, i));
599      region.put(createPut(5, i));
600    }
601
602    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
603    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
604    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
605
606    /*------------------------------------------------------------------------------*/
607    /* PHASE VI - Flush */
608    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
609    // None among compacting memstores was flushed to memory due to previous puts.
610    // But is going to be moved to pipeline and flatten due to the flush.
611    region.flush(false);
612    // Since we won't find any CF above the threshold, and hence no specific
613    // store to flush, we should flush all the memstores
614    // Also compacted memstores are flushed to disk, but not entirely emptied
615    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
616    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
617    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
618
619    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
620    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
621    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
622
623    HBaseTestingUtil.closeRegionAndWAL(region);
624  }
625
626  @Test
627  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
628    // Set up the configuration
629    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
630    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
631    // set memstore to do data compaction and not to use the speculative scan
632    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
633      String.valueOf(MemoryCompactionPolicy.EAGER));
634
635    // Intialize the HRegion
636    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
637    verifyInMemoryFlushSize(region);
638    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
639    for (int i = 1; i <= 1200; i++) {
640      region.put(createPut(1, i));
641      if (i <= 100) {
642        region.put(createPut(2, i));
643        if (i <= 50) {
644          region.put(createPut(3, i));
645        }
646      }
647    }
648    // Now add more puts for CF2, so that we only flush CF2 to disk
649    for (int i = 100; i < 2000; i++) {
650      region.put(createPut(2, i));
651    }
652
653    // in this test check the non-composite snapshot - flashing only tail of the pipeline
654    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
655    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
656
657    long totalMemstoreSize = region.getMemStoreDataSize();
658
659    // Find the sizes of the memstores of each CF.
660    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
661    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
662    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
663
664    // Some other sanity checks.
665    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
666    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
667    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
668
669    // The total memstore size should be the same as the sum of the sizes of
670    // memstores of CF1, CF2 and CF3.
671    String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="
672      + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI
673      + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="
674      + cf3MemstoreSizePhaseI;
675    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
676      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
677
678    // Flush!
679    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
680    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
681    cms1.flushInMemory();
682    cms3.flushInMemory();
683    region.flush(false);
684
685    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
686
687    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
688      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
689    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
690    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
691    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
692
693    // CF2 should have been cleared
694    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
695    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
696
697    String s = "\n\n----------------------------------\n"
698      + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
699      + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII
700      + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
701
702    // Add same entries to compact them later
703    for (int i = 1; i <= 1200; i++) {
704      region.put(createPut(1, i));
705      if (i <= 100) {
706        region.put(createPut(2, i));
707        if (i <= 50) {
708          region.put(createPut(3, i));
709        }
710      }
711    }
712    // Now add more puts for CF2, so that we only flush CF2 to disk
713    for (int i = 100; i < 2000; i++) {
714      region.put(createPut(2, i));
715    }
716
717    long smallestSeqInRegionCurrentMemstorePhaseIII = AbstractTestFSWAL
718      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
719    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
720    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
721    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
722
723    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
724      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", "
725      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
726      + smallestSeqCF3PhaseIII + "\n";
727
728    // Flush!
729    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
730    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
731    cms1.flushInMemory();
732    cms3.flushInMemory();
733    region.flush(false);
734
735    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
736      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
737    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
738    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
739    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
740
741    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
742      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
743      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
744      + smallestSeqCF3PhaseIV + "\n";
745
746    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
747    assertTrue(s,
748      smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
749    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
750    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
751
752    HBaseTestingUtil.closeRegionAndWAL(region);
753  }
754
755  @Test
756  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
757    // Set up the configuration
758    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
759    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
760    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
761    // set memstore to do index compaction with merge
762    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
763      String.valueOf(MemoryCompactionPolicy.BASIC));
764    // length of pipeline that requires merge
765    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
766
767    // Intialize the HRegion
768    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
769    verifyInMemoryFlushSize(region);
770    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
771    for (int i = 1; i <= 1200; i++) {
772      region.put(createPut(1, i));
773      if (i <= 100) {
774        region.put(createPut(2, i));
775        if (i <= 50) {
776          region.put(createPut(3, i));
777        }
778      }
779    }
780    // Now put more entries to CF2
781    for (int i = 100; i < 2000; i++) {
782      region.put(createPut(2, i));
783    }
784
785    long totalMemstoreSize = region.getMemStoreDataSize();
786
787    // test in-memory flashing into CAM here
788    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
789      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
790    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
791      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
792
793    // Find the sizes of the memstores of each CF.
794    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
795    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
796    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
797
798    // Some other sanity checks.
799    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
800    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
801    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
802
803    // The total memstore size should be the same as the sum of the sizes of
804    // memstores of CF1, CF2 and CF3.
805    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
806      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
807
808    // Initiate in-memory Flush!
809    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
810    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
811    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
812    while (
813      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
814        .isMemStoreFlushingInMemory()
815    ) {
816      Threads.sleep(10);
817    }
818    while (
819      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
820        .isMemStoreFlushingInMemory()
821    ) {
822      Threads.sleep(10);
823    }
824
825    // Flush-to-disk! CF2 only should be flushed
826    region.flush(false);
827
828    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
829    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
830    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
831
832    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
833    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
834    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
835    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
836    // CF2 should have been cleared
837    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
838
839    // Add the same amount of entries to see the merging
840    for (int i = 1; i <= 1200; i++) {
841      region.put(createPut(1, i));
842      if (i <= 100) {
843        region.put(createPut(2, i));
844        if (i <= 50) {
845          region.put(createPut(3, i));
846        }
847      }
848    }
849    // Now add more puts for CF2, so that we only flush CF2 to disk
850    for (int i = 100; i < 2000; i++) {
851      region.put(createPut(2, i));
852    }
853
854    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
855
856    // Flush in memory!
857    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
858    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
859    // CF1 and CF3 should be merged so wait here to be sure the merge is done
860    while (
861      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
862        .isMemStoreFlushingInMemory()
863    ) {
864      Threads.sleep(10);
865    }
866    while (
867      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
868        .isMemStoreFlushingInMemory()
869    ) {
870      Threads.sleep(10);
871    }
872    region.flush(false);
873
874    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
875    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
876
877    assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
878    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
879    // should be the same in flattening and in merge (first and second in-memory-flush)
880    // but in phase 1 we do not yet have immutable segment
881    assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
882      cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
883        - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
884    assertEquals(3, // active, one in pipeline, snapshot
885      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size());
886    // CF2 should have been cleared
887    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
888      + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize()
889      + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize()
890      + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize()
891      + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize()
892      + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize()
893      + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize()
894      + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize()
895      + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize()
896      + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize());
897
898    HBaseTestingUtil.closeRegionAndWAL(region);
899  }
900
901  // should end in 300 seconds (5 minutes)
902  @Test
903  public void testStressFlushAndWALinIndexCompaction() throws IOException {
904    // Set up the configuration
905    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
906    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
907      200 * 1024);
908    // set memstore to do data compaction and not to use the speculative scan
909    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
910      String.valueOf(MemoryCompactionPolicy.BASIC));
911
912    // Successfully initialize the HRegion
913    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
914    verifyInMemoryFlushSize(region);
915    Thread[] threads = new Thread[25];
916    for (int i = 0; i < threads.length; i++) {
917      int id = i * 10000;
918      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
919      threads[i] = new Thread(runnable);
920      threads[i].start();
921    }
922    Threads.sleep(10000); // let other threads start
923    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
924    Threads.sleep(10000); // let other threads continue
925    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
926
927    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
928    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
929    while (
930      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
931        .isMemStoreFlushingInMemory()
932    ) {
933      Threads.sleep(10);
934    }
935    while (
936      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
937        .isMemStoreFlushingInMemory()
938    ) {
939      Threads.sleep(10);
940    }
941
942    for (int i = 0; i < threads.length; i++) {
943      try {
944        threads[i].join();
945      } catch (InterruptedException e) {
946        e.printStackTrace();
947      }
948    }
949  }
950
951  /**
952   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
953   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
954   * releases updatesLock and compacts the pipeline.
955   */
956  private class ConcurrentPutRunnable implements Runnable {
957    private final HRegion stressedRegion;
958    private final int startNumber;
959
960    ConcurrentPutRunnable(HRegion r, int i) {
961      this.stressedRegion = r;
962      this.startNumber = i;
963    }
964
965    @Override
966    public void run() {
967
968      try {
969        int dummy = startNumber / 10000;
970        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
971        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
972        for (int i = startNumber; i <= startNumber + 3000; i++) {
973          stressedRegion.put(createPut(1, i));
974          if (i <= startNumber + 2000) {
975            stressedRegion.put(createPut(2, i));
976            if (i <= startNumber + 1000) {
977              stressedRegion.put(createPut(3, i));
978            }
979          }
980        }
981        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
982        // Now add more puts for CF2, so that we only flush CF2 to disk
983        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
984          stressedRegion.put(createPut(2, i));
985        }
986        // And add more puts for CF1
987        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
988          stressedRegion.put(createPut(1, i));
989        }
990        System.out.print("Thread with start number " + startNumber + " flushes\n");
991        // flush (IN MEMORY) one of the stores (each thread flushes different store)
992        // and wait till the flush and the following action are done
993        if (startNumber == 0) {
994          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
995            .flushInMemory();
996          while (
997            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
998              .isMemStoreFlushingInMemory()
999          ) {
1000            Threads.sleep(10);
1001          }
1002        }
1003        if (startNumber == 10000) {
1004          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1005            .flushInMemory();
1006          while (
1007            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1008              .isMemStoreFlushingInMemory()
1009          ) {
1010            Threads.sleep(10);
1011          }
1012        }
1013        if (startNumber == 20000) {
1014          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1015            .flushInMemory();
1016          while (
1017            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1018              .isMemStoreFlushingInMemory()
1019          ) {
1020            Threads.sleep(10);
1021          }
1022        }
1023        System.out.print("Thread with start number " + startNumber + " finishes\n");
1024      } catch (IOException e) {
1025        assert false;
1026      }
1027    }
1028  }
1029
1030  private WAL getWAL(Region region) {
1031    return ((HRegion) region).getWAL();
1032  }
1033}