001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.NamespaceDescriptor; 035import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 057import org.apache.hadoop.hbase.wal.WAL; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.hash.Hashing; 065 066/** 067 * This test verifies the correctness of the Per Column Family flushing strategy 068 */ 069@Category({ RegionServerTests.class, LargeTests.class }) 070public class TestPerColumnFamilyFlush { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class); 077 078 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 080 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 081 082 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); 083 084 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 085 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 086 087 public static final byte[] FAMILY1 = FAMILIES[0]; 088 089 public static final byte[] FAMILY2 = FAMILIES[1]; 090 091 public static final byte[] FAMILY3 = FAMILIES[2]; 092 093 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 094 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 095 for (byte[] family : FAMILIES) { 096 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 097 } 098 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 099 Path path = new Path(DIR, callingMethod); 100 return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build()); 101 } 102 103 // A helper function to create puts. 104 private Put createPut(int familyNum, int putNum) { 105 byte[] qf = Bytes.toBytes("q" + familyNum); 106 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 107 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 108 Put p = new Put(row); 109 p.addColumn(FAMILIES[familyNum - 1], qf, val); 110 return p; 111 } 112 113 // A helper function to create puts. 114 private Get createGet(int familyNum, int putNum) { 115 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 116 return new Get(row); 117 } 118 119 // A helper function to verify edits. 120 void verifyEdit(int familyNum, int putNum, Table table) throws IOException { 121 Result r = table.get(createGet(familyNum, putNum)); 122 byte[] family = FAMILIES[familyNum - 1]; 123 byte[] qf = Bytes.toBytes("q" + familyNum); 124 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 125 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); 126 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), 127 r.getFamilyMap(family).get(qf)); 128 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), 129 Arrays.equals(r.getFamilyMap(family).get(qf), val)); 130 } 131 132 @Test 133 public void testSelectiveFlushWhenEnabled() throws IOException { 134 // Set up the configuration, use new one to not conflict with minicluster in other tests 135 Configuration conf = new HBaseTestingUtil().getConfiguration(); 136 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 137 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 138 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024); 139 // Intialize the region 140 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf); 141 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 142 for (int i = 1; i <= 1200; i++) { 143 region.put(createPut(1, i)); 144 145 if (i <= 100) { 146 region.put(createPut(2, i)); 147 if (i <= 50) { 148 region.put(createPut(3, i)); 149 } 150 } 151 } 152 153 long totalMemstoreSize = region.getMemStoreDataSize(); 154 155 // Find the smallest LSNs for edits wrt to each CF. 156 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); 157 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); 158 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); 159 160 // Find the sizes of the memstores of each CF. 161 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 162 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 163 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 164 165 // Get the overall smallest LSN in the region's memstores. 166 long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL 167 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 168 169 // The overall smallest LSN in the region's memstores should be the same as 170 // the LSN of the smallest edit in CF1 171 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); 172 173 // Some other sanity checks. 174 assertTrue(smallestSeqCF1 < smallestSeqCF2); 175 assertTrue(smallestSeqCF2 < smallestSeqCF3); 176 assertTrue(cf1MemstoreSize.getDataSize() > 0); 177 assertTrue(cf2MemstoreSize.getDataSize() > 0); 178 assertTrue(cf3MemstoreSize.getDataSize() > 0); 179 180 // The total memstore size should be the same as the sum of the sizes of 181 // memstores of CF1, CF2 and CF3. 182 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 183 + cf3MemstoreSize.getDataSize()); 184 185 // Flush! 186 region.flush(false); 187 188 // Will use these to check if anything changed. 189 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize; 190 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize; 191 192 // Recalculate everything 193 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 194 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 195 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 196 totalMemstoreSize = region.getMemStoreDataSize(); 197 smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region), 198 region.getRegionInfo().getEncodedNameAsBytes()); 199 200 // We should have cleared out only CF1, since we chose the flush thresholds 201 // and number of puts accordingly. 202 assertEquals(0, cf1MemstoreSize.getDataSize()); 203 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 204 // Nothing should have happened to CF2, ... 205 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); 206 // ... or CF3 207 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 208 // Now the smallest LSN in the region should be the same as the smallest 209 // LSN in the memstore of CF2. 210 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); 211 // Of course, this should hold too. 212 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize()); 213 214 // Now add more puts (mostly for CF2), so that we only flush CF2 this time. 215 for (int i = 1200; i < 2400; i++) { 216 region.put(createPut(2, i)); 217 218 // Add only 100 puts for CF3 219 if (i - 1200 < 100) { 220 region.put(createPut(3, i)); 221 } 222 } 223 224 // How much does the CF3 memstore occupy? Will be used later. 225 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 226 227 // Flush again 228 region.flush(false); 229 230 // Recalculate everything 231 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 232 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 233 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 234 totalMemstoreSize = region.getMemStoreDataSize(); 235 smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region), 236 region.getRegionInfo().getEncodedNameAsBytes()); 237 238 // CF1 and CF2, both should be absent. 239 assertEquals(0, cf1MemstoreSize.getDataSize()); 240 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 241 assertEquals(0, cf2MemstoreSize.getDataSize()); 242 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 243 // CF3 shouldn't have been touched. 244 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 245 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); 246 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); 247 248 // What happens when we hit the memstore limit, but we are not able to find 249 // any Column Family above the threshold? 250 // In that case, we should flush all the CFs. 251 252 // Clearing the existing memstores. 253 region.flush(true); 254 255 // The memstore limit is 200*1024 and the column family flush threshold is 256 // around 50*1024. We try to just hit the memstore limit with each CF's 257 // memstore being below the CF flush threshold. 258 for (int i = 1; i <= 300; i++) { 259 region.put(createPut(1, i)); 260 region.put(createPut(2, i)); 261 region.put(createPut(3, i)); 262 region.put(createPut(4, i)); 263 region.put(createPut(5, i)); 264 } 265 266 region.flush(false); 267 268 // Since we won't find any CF above the threshold, and hence no specific 269 // store to flush, we should flush all the memstores. 270 assertEquals(0, region.getMemStoreDataSize()); 271 HBaseTestingUtil.closeRegionAndWAL(region); 272 } 273 274 @Test 275 public void testSelectiveFlushWhenNotEnabled() throws IOException { 276 // Set up the configuration, use new one to not conflict with minicluster in other tests 277 Configuration conf = new HBaseTestingUtil().getConfiguration(); 278 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 279 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 280 281 // Intialize the HRegion 282 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); 283 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 284 for (int i = 1; i <= 1200; i++) { 285 region.put(createPut(1, i)); 286 if (i <= 100) { 287 region.put(createPut(2, i)); 288 if (i <= 50) { 289 region.put(createPut(3, i)); 290 } 291 } 292 } 293 294 long totalMemstoreSize = region.getMemStoreDataSize(); 295 296 // Find the sizes of the memstores of each CF. 297 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 298 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 299 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 300 301 // Some other sanity checks. 302 assertTrue(cf1MemstoreSize.getDataSize() > 0); 303 assertTrue(cf2MemstoreSize.getDataSize() > 0); 304 assertTrue(cf3MemstoreSize.getDataSize() > 0); 305 306 // The total memstore size should be the same as the sum of the sizes of 307 // memstores of CF1, CF2 and CF3. 308 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 309 + cf3MemstoreSize.getDataSize()); 310 311 // Flush! 312 region.flush(false); 313 314 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 315 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 316 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 317 totalMemstoreSize = region.getMemStoreDataSize(); 318 long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL 319 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 320 321 // Everything should have been cleared 322 assertEquals(0, cf1MemstoreSize.getDataSize()); 323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 324 assertEquals(0, cf2MemstoreSize.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 326 assertEquals(0, cf3MemstoreSize.getDataSize()); 327 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize()); 328 assertEquals(0, totalMemstoreSize); 329 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); 330 HBaseTestingUtil.closeRegionAndWAL(region); 331 } 332 333 // Find the (first) region which has the specified name. 334 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) { 335 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 336 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 337 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 338 HRegionServer hrs = rsts.get(i).getRegionServer(); 339 for (HRegion region : hrs.getRegions(tableName)) { 340 return Pair.newPair(region, hrs); 341 } 342 } 343 return null; 344 } 345 346 private void doTestLogReplay() throws Exception { 347 Configuration conf = TEST_UTIL.getConfiguration(); 348 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000); 349 // Carefully chosen limits so that the memstore just flushes when we're done 350 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 351 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500); 352 final int numRegionServers = 4; 353 try { 354 TEST_UTIL.startMiniCluster(numRegionServers); 355 TEST_UTIL.getAdmin() 356 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 357 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES); 358 359 // Add 100 edits for CF1, 20 for CF2, 20 for CF3. 360 // These will all be interleaved in the log. 361 for (int i = 1; i <= 80; i++) { 362 table.put(createPut(1, i)); 363 if (i <= 10) { 364 table.put(createPut(2, i)); 365 table.put(createPut(3, i)); 366 } 367 } 368 Thread.sleep(1000); 369 370 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); 371 HRegion desiredRegion = desiredRegionAndServer.getFirst(); 372 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 373 374 // Flush the region selectively. 375 desiredRegion.flush(false); 376 377 long totalMemstoreSize; 378 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; 379 totalMemstoreSize = desiredRegion.getMemStoreDataSize(); 380 381 // Find the sizes of the memstores of each CF. 382 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize(); 383 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize(); 384 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize(); 385 386 // CF1 Should have been flushed 387 assertEquals(0, cf1MemstoreSize); 388 // CF2 and CF3 shouldn't have been flushed. 389 // TODO: This test doesn't allow for this case: 390 // " Since none of the CFs were above the size, flushing all." 391 // i.e. a flush happens before we get to here and its a flush-all. 392 assertTrue(cf2MemstoreSize >= 0); 393 assertTrue(cf3MemstoreSize >= 0); 394 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); 395 396 // Wait for the RS report to go across to the master, so that the master 397 // is aware of which sequence ids have been flushed, before we kill the RS. 398 // If in production, the RS dies before the report goes across, we will 399 // safely replay all the edits. 400 Thread.sleep(2000); 401 402 // Abort the region server where we have the region hosted. 403 HRegionServer rs = desiredRegionAndServer.getSecond(); 404 rs.abort("testing"); 405 406 // The aborted region server's regions will be eventually assigned to some 407 // other region server, and the get RPC call (inside verifyEdit()) will 408 // retry for some time till the regions come back up. 409 410 // Verify that all the edits are safe. 411 for (int i = 1; i <= 80; i++) { 412 verifyEdit(1, i, table); 413 if (i <= 10) { 414 verifyEdit(2, i, table); 415 verifyEdit(3, i, table); 416 } 417 } 418 } finally { 419 TEST_UTIL.shutdownMiniCluster(); 420 } 421 } 422 423 // Test Log Replay with Distributed log split on. 424 @Test 425 public void testLogReplayWithDistributedLogSplit() throws Exception { 426 doTestLogReplay(); 427 } 428 429 private WAL getWAL(Region region) { 430 return ((HRegion) region).getWAL(); 431 } 432 433 private int getNumRolledLogFiles(Region region) { 434 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region)); 435 } 436 437 /** 438 * When a log roll is about to happen, we do a flush of the regions who will be affected by the 439 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This 440 * test ensures that we do a full-flush in that scenario. 441 */ 442 @Test 443 public void testFlushingWhenLogRolling() throws Exception { 444 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); 445 Configuration conf = TEST_UTIL.getConfiguration(); 446 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 447 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 448 long cfFlushSizeLowerBound = 2048; 449 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 450 cfFlushSizeLowerBound); 451 452 // One hour, prevent periodic rolling 453 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); 454 // prevent rolling by size 455 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); 456 // Make it 10 as max logs before a flush comes on. 457 final int maxLogs = 10; 458 conf.setInt("hbase.regionserver.maxlogs", maxLogs); 459 460 final int numRegionServers = 1; 461 TEST_UTIL.startMiniCluster(numRegionServers); 462 try { 463 Table table = TEST_UTIL.createTable(tableName, FAMILIES); 464 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName); 465 final HRegion desiredRegion = desiredRegionAndServer.getFirst(); 466 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 467 LOG.info("Writing to region=" + desiredRegion); 468 469 // Add one row for both CFs. 470 for (int i = 1; i <= 3; i++) { 471 table.put(createPut(i, 0)); 472 } 473 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower 474 // bound and CF2 and CF3 are smaller than the lower bound. 475 for (int i = 0; i < maxLogs; i++) { 476 for (int j = 0; j < 100; j++) { 477 table.put(createPut(1, i * 100 + j)); 478 } 479 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. 480 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); 481 assertNull(getWAL(desiredRegion).rollWriter()); 482 TEST_UTIL.waitFor(60000, 483 () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles); 484 } 485 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); 486 assertTrue( 487 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound); 488 assertTrue( 489 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 490 assertTrue( 491 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 492 table.put(createPut(1, 12345678)); 493 // Make numRolledLogFiles greater than maxLogs 494 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); 495 // Wait for some time till the flush caused by log rolling happens. 496 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() { 497 498 @Override 499 public boolean evaluate() throws Exception { 500 return desiredRegion.getMemStoreDataSize() == 0; 501 } 502 503 @Override 504 public String explainFailure() throws Exception { 505 long memstoreSize = desiredRegion.getMemStoreDataSize(); 506 if (memstoreSize > 0) { 507 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; 508 } 509 return "Unknown"; 510 } 511 }); 512 LOG.info("Finished waiting on flush after too many WALs..."); 513 // Individual families should have been flushed. 514 assertEquals(MutableSegment.DEEP_OVERHEAD, 515 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); 516 assertEquals(MutableSegment.DEEP_OVERHEAD, 517 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); 518 assertEquals(MutableSegment.DEEP_OVERHEAD, 519 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); 520 // let WAL cleanOldLogs 521 assertNull(getWAL(desiredRegion).rollWriter(true)); 522 TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs); 523 } finally { 524 TEST_UTIL.shutdownMiniCluster(); 525 } 526 } 527 528 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { 529 Region region = getRegionWithName(table.getName()).getFirst(); 530 // cf1 4B per row, cf2 40B per row and cf3 400B per row 531 byte[] qf = Bytes.toBytes("qf"); 532 for (int i = 0; i < 10000; i++) { 533 Put put = new Put(Bytes.toBytes("row-" + i)); 534 byte[] value1 = new byte[100]; 535 Bytes.random(value1); 536 put.addColumn(FAMILY1, qf, value1); 537 byte[] value2 = new byte[200]; 538 Bytes.random(value2); 539 put.addColumn(FAMILY2, qf, value2); 540 byte[] value3 = new byte[400]; 541 Bytes.random(value3); 542 put.addColumn(FAMILY3, qf, value3); 543 table.put(put); 544 // slow down to let regionserver flush region. 545 while (region.getMemStoreHeapSize() > memstoreFlushSize) { 546 Thread.sleep(100); 547 } 548 } 549 } 550 551 // Under the same write load, small stores should have less store files when 552 // percolumnfamilyflush enabled. 553 @Test 554 public void testCompareStoreFileCount() throws Exception { 555 long memstoreFlushSize = 1024L * 1024; 556 Configuration conf = TEST_UTIL.getConfiguration(); 557 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize); 558 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 559 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 560 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 561 ConstantSizeRegionSplitPolicy.class.getName()); 562 563 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 564 .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 565 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 566 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 567 568 LOG.info("==============Test with selective flush disabled==============="); 569 int cf1StoreFileCount = -1; 570 int cf2StoreFileCount = -1; 571 int cf3StoreFileCount = -1; 572 int cf1StoreFileCount1 = -1; 573 int cf2StoreFileCount1 = -1; 574 int cf3StoreFileCount1 = -1; 575 try { 576 TEST_UTIL.startMiniCluster(1); 577 TEST_UTIL.getAdmin() 578 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 579 TEST_UTIL.getAdmin().createTable(tableDescriptor); 580 TEST_UTIL.waitTableAvailable(TABLENAME); 581 Connection conn = ConnectionFactory.createConnection(conf); 582 Table table = conn.getTable(TABLENAME); 583 doPut(table, memstoreFlushSize); 584 table.close(); 585 conn.close(); 586 587 Region region = getRegionWithName(TABLENAME).getFirst(); 588 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); 589 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); 590 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); 591 } finally { 592 TEST_UTIL.shutdownMiniCluster(); 593 } 594 595 LOG.info("==============Test with selective flush enabled==============="); 596 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 597 // default value of per-cf flush lower bound is too big, set to a small enough value 598 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0); 599 try { 600 TEST_UTIL.startMiniCluster(1); 601 TEST_UTIL.getAdmin() 602 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 603 TEST_UTIL.getAdmin().createTable(tableDescriptor); 604 Connection conn = ConnectionFactory.createConnection(conf); 605 Table table = conn.getTable(TABLENAME); 606 doPut(table, memstoreFlushSize); 607 table.close(); 608 conn.close(); 609 610 Region region = getRegionWithName(TABLENAME).getFirst(); 611 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); 612 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); 613 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); 614 } finally { 615 TEST_UTIL.shutdownMiniCluster(); 616 } 617 618 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " 619 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>" 620 + cf3StoreFileCount); 621 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", " 622 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>" 623 + cf3StoreFileCount1); 624 // small CF will have less store files. 625 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); 626 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); 627 } 628 629 public static void main(String[] args) throws Exception { 630 int numRegions = Integer.parseInt(args[0]); 631 long numRows = Long.parseLong(args[1]); 632 633 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 634 .setMaxFileSize(10L * 1024 * 1024 * 1024) 635 .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()) 636 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 637 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 638 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 639 640 Configuration conf = HBaseConfiguration.create(); 641 Connection conn = ConnectionFactory.createConnection(conf); 642 Admin admin = conn.getAdmin(); 643 if (admin.tableExists(TABLENAME)) { 644 admin.disableTable(TABLENAME); 645 admin.deleteTable(TABLENAME); 646 } 647 if (numRegions >= 3) { 648 byte[] startKey = new byte[16]; 649 byte[] endKey = new byte[16]; 650 Arrays.fill(endKey, (byte) 0xFF); 651 admin.createTable(tableDescriptor, startKey, endKey, numRegions); 652 } else { 653 admin.createTable(tableDescriptor); 654 } 655 admin.close(); 656 657 Table table = conn.getTable(TABLENAME); 658 byte[] qf = Bytes.toBytes("qf"); 659 byte[] value1 = new byte[16]; 660 byte[] value2 = new byte[256]; 661 byte[] value3 = new byte[4096]; 662 for (long i = 0; i < numRows; i++) { 663 Put put = new Put(Hashing.md5().hashLong(i).asBytes()); 664 Bytes.random(value1); 665 Bytes.random(value2); 666 Bytes.random(value3); 667 put.addColumn(FAMILY1, qf, value1); 668 put.addColumn(FAMILY2, qf, value2); 669 put.addColumn(FAMILY3, qf, value3); 670 table.put(put); 671 if (i % 10000 == 0) { 672 LOG.info(i + " rows put"); 673 } 674 } 675 table.close(); 676 conn.close(); 677 } 678}