001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
057import org.apache.hadoop.hbase.wal.WAL;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
065
066/**
067 * This test verifies the correctness of the Per Column Family flushing strategy
068 */
069@Category({ RegionServerTests.class, LargeTests.class })
070public class TestPerColumnFamilyFlush {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
077
078  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079
080  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
081
082  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
083
084  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
085    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
086
087  public static final byte[] FAMILY1 = FAMILIES[0];
088
089  public static final byte[] FAMILY2 = FAMILIES[1];
090
091  public static final byte[] FAMILY3 = FAMILIES[2];
092
093  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
094    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
095    for (byte[] family : FAMILIES) {
096      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
097    }
098    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
099    Path path = new Path(DIR, callingMethod);
100    return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
101  }
102
103  // A helper function to create puts.
104  private Put createPut(int familyNum, int putNum) {
105    byte[] qf = Bytes.toBytes("q" + familyNum);
106    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
107    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
108    Put p = new Put(row);
109    p.addColumn(FAMILIES[familyNum - 1], qf, val);
110    return p;
111  }
112
113  // A helper function to create puts.
114  private Get createGet(int familyNum, int putNum) {
115    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
116    return new Get(row);
117  }
118
119  // A helper function to verify edits.
120  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
121    Result r = table.get(createGet(familyNum, putNum));
122    byte[] family = FAMILIES[familyNum - 1];
123    byte[] qf = Bytes.toBytes("q" + familyNum);
124    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
125    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
126    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
127      r.getFamilyMap(family).get(qf));
128    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
129      Arrays.equals(r.getFamilyMap(family).get(qf), val));
130  }
131
132  @Test
133  public void testSelectiveFlushWhenEnabled() throws IOException {
134    // Set up the configuration, use new one to not conflict with minicluster in other tests
135    Configuration conf = new HBaseTestingUtil().getConfiguration();
136    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
137    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
138    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024);
139    // Intialize the region
140    HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
141    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
142    for (int i = 1; i <= 1200; i++) {
143      region.put(createPut(1, i));
144
145      if (i <= 100) {
146        region.put(createPut(2, i));
147        if (i <= 50) {
148          region.put(createPut(3, i));
149        }
150      }
151    }
152
153    long totalMemstoreSize = region.getMemStoreDataSize();
154
155    // Find the smallest LSNs for edits wrt to each CF.
156    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
157    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
158    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
159
160    // Find the sizes of the memstores of each CF.
161    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
162    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
163    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
164
165    // Get the overall smallest LSN in the region's memstores.
166    long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
167      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
168
169    // The overall smallest LSN in the region's memstores should be the same as
170    // the LSN of the smallest edit in CF1
171    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
172
173    // Some other sanity checks.
174    assertTrue(smallestSeqCF1 < smallestSeqCF2);
175    assertTrue(smallestSeqCF2 < smallestSeqCF3);
176    assertTrue(cf1MemstoreSize.getDataSize() > 0);
177    assertTrue(cf2MemstoreSize.getDataSize() > 0);
178    assertTrue(cf3MemstoreSize.getDataSize() > 0);
179
180    // The total memstore size should be the same as the sum of the sizes of
181    // memstores of CF1, CF2 and CF3.
182    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
183      + cf3MemstoreSize.getDataSize());
184
185    // Flush!
186    region.flush(false);
187
188    // Will use these to check if anything changed.
189    MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
190    MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
191
192    // Recalculate everything
193    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
194    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
195    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
196    totalMemstoreSize = region.getMemStoreDataSize();
197    smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
198      region.getRegionInfo().getEncodedNameAsBytes());
199
200    // We should have cleared out only CF1, since we chose the flush thresholds
201    // and number of puts accordingly.
202    assertEquals(0, cf1MemstoreSize.getDataSize());
203    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
204    // Nothing should have happened to CF2, ...
205    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
206    // ... or CF3
207    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
208    // Now the smallest LSN in the region should be the same as the smallest
209    // LSN in the memstore of CF2.
210    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
211    // Of course, this should hold too.
212    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
213
214    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
215    for (int i = 1200; i < 2400; i++) {
216      region.put(createPut(2, i));
217
218      // Add only 100 puts for CF3
219      if (i - 1200 < 100) {
220        region.put(createPut(3, i));
221      }
222    }
223
224    // How much does the CF3 memstore occupy? Will be used later.
225    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
226
227    // Flush again
228    region.flush(false);
229
230    // Recalculate everything
231    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
232    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
233    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
234    totalMemstoreSize = region.getMemStoreDataSize();
235    smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
236      region.getRegionInfo().getEncodedNameAsBytes());
237
238    // CF1 and CF2, both should be absent.
239    assertEquals(0, cf1MemstoreSize.getDataSize());
240    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
241    assertEquals(0, cf2MemstoreSize.getDataSize());
242    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
243    // CF3 shouldn't have been touched.
244    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
245    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
246    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
247
248    // What happens when we hit the memstore limit, but we are not able to find
249    // any Column Family above the threshold?
250    // In that case, we should flush all the CFs.
251
252    // Clearing the existing memstores.
253    region.flush(true);
254
255    // The memstore limit is 200*1024 and the column family flush threshold is
256    // around 50*1024. We try to just hit the memstore limit with each CF's
257    // memstore being below the CF flush threshold.
258    for (int i = 1; i <= 300; i++) {
259      region.put(createPut(1, i));
260      region.put(createPut(2, i));
261      region.put(createPut(3, i));
262      region.put(createPut(4, i));
263      region.put(createPut(5, i));
264    }
265
266    region.flush(false);
267
268    // Since we won't find any CF above the threshold, and hence no specific
269    // store to flush, we should flush all the memstores.
270    assertEquals(0, region.getMemStoreDataSize());
271    HBaseTestingUtil.closeRegionAndWAL(region);
272  }
273
274  @Test
275  public void testSelectiveFlushWhenNotEnabled() throws IOException {
276    // Set up the configuration, use new one to not conflict with minicluster in other tests
277    Configuration conf = new HBaseTestingUtil().getConfiguration();
278    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
279    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
280
281    // Intialize the HRegion
282    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
283    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
284    for (int i = 1; i <= 1200; i++) {
285      region.put(createPut(1, i));
286      if (i <= 100) {
287        region.put(createPut(2, i));
288        if (i <= 50) {
289          region.put(createPut(3, i));
290        }
291      }
292    }
293
294    long totalMemstoreSize = region.getMemStoreDataSize();
295
296    // Find the sizes of the memstores of each CF.
297    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
298    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
299    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
300
301    // Some other sanity checks.
302    assertTrue(cf1MemstoreSize.getDataSize() > 0);
303    assertTrue(cf2MemstoreSize.getDataSize() > 0);
304    assertTrue(cf3MemstoreSize.getDataSize() > 0);
305
306    // The total memstore size should be the same as the sum of the sizes of
307    // memstores of CF1, CF2 and CF3.
308    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
309      + cf3MemstoreSize.getDataSize());
310
311    // Flush!
312    region.flush(false);
313
314    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
315    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
316    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
317    totalMemstoreSize = region.getMemStoreDataSize();
318    long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
319      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
320
321    // Everything should have been cleared
322    assertEquals(0, cf1MemstoreSize.getDataSize());
323    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
324    assertEquals(0, cf2MemstoreSize.getDataSize());
325    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
326    assertEquals(0, cf3MemstoreSize.getDataSize());
327    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
328    assertEquals(0, totalMemstoreSize);
329    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
330    HBaseTestingUtil.closeRegionAndWAL(region);
331  }
332
333  // Find the (first) region which has the specified name.
334  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
335    SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
336    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
337    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
338      HRegionServer hrs = rsts.get(i).getRegionServer();
339      for (HRegion region : hrs.getRegions(tableName)) {
340        return Pair.newPair(region, hrs);
341      }
342    }
343    return null;
344  }
345
346  private void doTestLogReplay() throws Exception {
347    Configuration conf = TEST_UTIL.getConfiguration();
348    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
349    // Carefully chosen limits so that the memstore just flushes when we're done
350    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
351    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
352    final int numRegionServers = 4;
353    try {
354      TEST_UTIL.startMiniCluster(numRegionServers);
355      TEST_UTIL.getAdmin()
356        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
357      Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
358
359      // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
360      // These will all be interleaved in the log.
361      for (int i = 1; i <= 80; i++) {
362        table.put(createPut(1, i));
363        if (i <= 10) {
364          table.put(createPut(2, i));
365          table.put(createPut(3, i));
366        }
367      }
368      Thread.sleep(1000);
369
370      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
371      HRegion desiredRegion = desiredRegionAndServer.getFirst();
372      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
373
374      // Flush the region selectively.
375      desiredRegion.flush(false);
376
377      long totalMemstoreSize;
378      long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
379      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
380
381      // Find the sizes of the memstores of each CF.
382      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
383      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
384      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
385
386      // CF1 Should have been flushed
387      assertEquals(0, cf1MemstoreSize);
388      // CF2 and CF3 shouldn't have been flushed.
389      // TODO: This test doesn't allow for this case:
390      // " Since none of the CFs were above the size, flushing all."
391      // i.e. a flush happens before we get to here and its a flush-all.
392      assertTrue(cf2MemstoreSize >= 0);
393      assertTrue(cf3MemstoreSize >= 0);
394      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
395
396      // Wait for the RS report to go across to the master, so that the master
397      // is aware of which sequence ids have been flushed, before we kill the RS.
398      // If in production, the RS dies before the report goes across, we will
399      // safely replay all the edits.
400      Thread.sleep(2000);
401
402      // Abort the region server where we have the region hosted.
403      HRegionServer rs = desiredRegionAndServer.getSecond();
404      rs.abort("testing");
405
406      // The aborted region server's regions will be eventually assigned to some
407      // other region server, and the get RPC call (inside verifyEdit()) will
408      // retry for some time till the regions come back up.
409
410      // Verify that all the edits are safe.
411      for (int i = 1; i <= 80; i++) {
412        verifyEdit(1, i, table);
413        if (i <= 10) {
414          verifyEdit(2, i, table);
415          verifyEdit(3, i, table);
416        }
417      }
418    } finally {
419      TEST_UTIL.shutdownMiniCluster();
420    }
421  }
422
423  // Test Log Replay with Distributed log split on.
424  @Test
425  public void testLogReplayWithDistributedLogSplit() throws Exception {
426    doTestLogReplay();
427  }
428
429  private WAL getWAL(Region region) {
430    return ((HRegion) region).getWAL();
431  }
432
433  private int getNumRolledLogFiles(Region region) {
434    return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
435  }
436
437  /**
438   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
439   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
440   * test ensures that we do a full-flush in that scenario.
441   */
442  @Test
443  public void testFlushingWhenLogRolling() throws Exception {
444    TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
445    Configuration conf = TEST_UTIL.getConfiguration();
446    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
447    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
448    long cfFlushSizeLowerBound = 2048;
449    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
450      cfFlushSizeLowerBound);
451
452    // One hour, prevent periodic rolling
453    conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
454    // prevent rolling by size
455    conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
456    // Make it 10 as max logs before a flush comes on.
457    final int maxLogs = 10;
458    conf.setInt("hbase.regionserver.maxlogs", maxLogs);
459
460    final int numRegionServers = 1;
461    TEST_UTIL.startMiniCluster(numRegionServers);
462    try {
463      Table table = TEST_UTIL.createTable(tableName, FAMILIES);
464      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
465      final HRegion desiredRegion = desiredRegionAndServer.getFirst();
466      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
467      LOG.info("Writing to region=" + desiredRegion);
468
469      // Add one row for both CFs.
470      for (int i = 1; i <= 3; i++) {
471        table.put(createPut(i, 0));
472      }
473      // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
474      // bound and CF2 and CF3 are smaller than the lower bound.
475      for (int i = 0; i < maxLogs; i++) {
476        for (int j = 0; j < 100; j++) {
477          table.put(createPut(1, i * 100 + j));
478        }
479        // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
480        int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
481        assertNull(getWAL(desiredRegion).rollWriter());
482        TEST_UTIL.waitFor(60000,
483          () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
484      }
485      assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
486      assertTrue(
487        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
488      assertTrue(
489        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
490      assertTrue(
491        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
492      table.put(createPut(1, 12345678));
493      // Make numRolledLogFiles greater than maxLogs
494      desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
495      // Wait for some time till the flush caused by log rolling happens.
496      TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
497
498        @Override
499        public boolean evaluate() throws Exception {
500          return desiredRegion.getMemStoreDataSize() == 0;
501        }
502
503        @Override
504        public String explainFailure() throws Exception {
505          long memstoreSize = desiredRegion.getMemStoreDataSize();
506          if (memstoreSize > 0) {
507            return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
508          }
509          return "Unknown";
510        }
511      });
512      LOG.info("Finished waiting on flush after too many WALs...");
513      // Individual families should have been flushed.
514      assertEquals(MutableSegment.DEEP_OVERHEAD,
515        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
516      assertEquals(MutableSegment.DEEP_OVERHEAD,
517        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
518      assertEquals(MutableSegment.DEEP_OVERHEAD,
519        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
520      // let WAL cleanOldLogs
521      assertNull(getWAL(desiredRegion).rollWriter(true));
522      TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
523    } finally {
524      TEST_UTIL.shutdownMiniCluster();
525    }
526  }
527
528  private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
529    Region region = getRegionWithName(table.getName()).getFirst();
530    // cf1 4B per row, cf2 40B per row and cf3 400B per row
531    byte[] qf = Bytes.toBytes("qf");
532    for (int i = 0; i < 10000; i++) {
533      Put put = new Put(Bytes.toBytes("row-" + i));
534      byte[] value1 = new byte[100];
535      Bytes.random(value1);
536      put.addColumn(FAMILY1, qf, value1);
537      byte[] value2 = new byte[200];
538      Bytes.random(value2);
539      put.addColumn(FAMILY2, qf, value2);
540      byte[] value3 = new byte[400];
541      Bytes.random(value3);
542      put.addColumn(FAMILY3, qf, value3);
543      table.put(put);
544      // slow down to let regionserver flush region.
545      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
546        Thread.sleep(100);
547      }
548    }
549  }
550
551  // Under the same write load, small stores should have less store files when
552  // percolumnfamilyflush enabled.
553  @Test
554  public void testCompareStoreFileCount() throws Exception {
555    long memstoreFlushSize = 1024L * 1024;
556    Configuration conf = TEST_UTIL.getConfiguration();
557    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
558    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
559    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
560    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
561      ConstantSizeRegionSplitPolicy.class.getName());
562
563    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
564      .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
565      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
566      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
567
568    LOG.info("==============Test with selective flush disabled===============");
569    int cf1StoreFileCount = -1;
570    int cf2StoreFileCount = -1;
571    int cf3StoreFileCount = -1;
572    int cf1StoreFileCount1 = -1;
573    int cf2StoreFileCount1 = -1;
574    int cf3StoreFileCount1 = -1;
575    try {
576      TEST_UTIL.startMiniCluster(1);
577      TEST_UTIL.getAdmin()
578        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
579      TEST_UTIL.getAdmin().createTable(tableDescriptor);
580      TEST_UTIL.waitTableAvailable(TABLENAME);
581      Connection conn = ConnectionFactory.createConnection(conf);
582      Table table = conn.getTable(TABLENAME);
583      doPut(table, memstoreFlushSize);
584      table.close();
585      conn.close();
586
587      Region region = getRegionWithName(TABLENAME).getFirst();
588      cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
589      cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
590      cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
591    } finally {
592      TEST_UTIL.shutdownMiniCluster();
593    }
594
595    LOG.info("==============Test with selective flush enabled===============");
596    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
597    // default value of per-cf flush lower bound is too big, set to a small enough value
598    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
599    try {
600      TEST_UTIL.startMiniCluster(1);
601      TEST_UTIL.getAdmin()
602        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
603      TEST_UTIL.getAdmin().createTable(tableDescriptor);
604      Connection conn = ConnectionFactory.createConnection(conf);
605      Table table = conn.getTable(TABLENAME);
606      doPut(table, memstoreFlushSize);
607      table.close();
608      conn.close();
609
610      Region region = getRegionWithName(TABLENAME).getFirst();
611      cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
612      cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
613      cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
614    } finally {
615      TEST_UTIL.shutdownMiniCluster();
616    }
617
618    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", "
619      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>"
620      + cf3StoreFileCount);
621    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", "
622      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>"
623      + cf3StoreFileCount1);
624    // small CF will have less store files.
625    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
626    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
627  }
628
629  public static void main(String[] args) throws Exception {
630    int numRegions = Integer.parseInt(args[0]);
631    long numRows = Long.parseLong(args[1]);
632
633    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
634      .setMaxFileSize(10L * 1024 * 1024 * 1024)
635      .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName())
636      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
637      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
638      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
639
640    Configuration conf = HBaseConfiguration.create();
641    Connection conn = ConnectionFactory.createConnection(conf);
642    Admin admin = conn.getAdmin();
643    if (admin.tableExists(TABLENAME)) {
644      admin.disableTable(TABLENAME);
645      admin.deleteTable(TABLENAME);
646    }
647    if (numRegions >= 3) {
648      byte[] startKey = new byte[16];
649      byte[] endKey = new byte[16];
650      Arrays.fill(endKey, (byte) 0xFF);
651      admin.createTable(tableDescriptor, startKey, endKey, numRegions);
652    } else {
653      admin.createTable(tableDescriptor);
654    }
655    admin.close();
656
657    Table table = conn.getTable(TABLENAME);
658    byte[] qf = Bytes.toBytes("qf");
659    byte[] value1 = new byte[16];
660    byte[] value2 = new byte[256];
661    byte[] value3 = new byte[4096];
662    for (long i = 0; i < numRows; i++) {
663      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
664      Bytes.random(value1);
665      Bytes.random(value2);
666      Bytes.random(value3);
667      put.addColumn(FAMILY1, qf, value1);
668      put.addColumn(FAMILY2, qf, value2);
669      put.addColumn(FAMILY3, qf, value3);
670      table.put(put);
671      if (i % 10000 == 0) {
672        LOG.info(i + " rows put");
673      }
674    }
675    table.close();
676    conn.close();
677  }
678}