001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ThreadPoolExecutor; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.MemoryCompactionPolicy; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.hadoop.hbase.wal.WAL; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.mockito.Mockito; 050 051/** 052 * This test verifies the correctness of the Per Column Family flushing strategy when part of the 053 * memstores are compacted memstores 054 */ 055@Category({ RegionServerTests.class, LargeTests.class }) 056public class TestWalAndCompactingMemStoreFlush { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class); 061 062 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 063 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 064 public static final TableName TABLENAME = 065 TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); 066 067 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 068 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 069 070 public static final byte[] FAMILY1 = FAMILIES[0]; 071 public static final byte[] FAMILY2 = FAMILIES[1]; 072 public static final byte[] FAMILY3 = FAMILIES[2]; 073 074 private Configuration conf; 075 076 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 077 int i = 0; 078 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 079 for (byte[] family : FAMILIES) { 080 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 081 // even column families are going to have compacted memstore 082 if (i % 2 == 0) { 083 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy 084 .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); 085 } else { 086 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 087 } 088 builder.setColumnFamily(cfBuilder.build()); 089 i++; 090 } 091 092 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 093 Path path = new Path(DIR, callingMethod); 094 HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false); 095 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 096 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 097 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 098 region.initialize(null); 099 return region; 100 } 101 102 // A helper function to create puts. 103 private Put createPut(int familyNum, int putNum) { 104 byte[] qf = Bytes.toBytes("q" + familyNum); 105 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 106 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 107 Put p = new Put(row); 108 p.addColumn(FAMILIES[familyNum - 1], qf, val); 109 return p; 110 } 111 112 // A helper function to create double puts, so something can be compacted later. 113 private Put createDoublePut(int familyNum, int putNum) { 114 byte[] qf = Bytes.toBytes("q" + familyNum); 115 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 116 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 117 Put p = new Put(row); 118 // add twice with different timestamps 119 p.addColumn(FAMILIES[familyNum - 1], qf, 10, val); 120 p.addColumn(FAMILIES[familyNum - 1], qf, 20, val); 121 return p; 122 } 123 124 private void verifyInMemoryFlushSize(Region region) { 125 assertEquals( 126 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), 127 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); 128 } 129 130 @Before 131 public void setup() { 132 conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 133 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 134 FlushNonSloppyStoresFirstPolicy.class.getName()); 135 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 136 } 137 138 @Test 139 public void testSelectiveFlushWithEager() throws IOException { 140 // Set up the configuration 141 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 142 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 143 // set memstore to do data compaction 144 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 145 String.valueOf(MemoryCompactionPolicy.EAGER)); 146 147 // Intialize the region 148 HRegion region = initHRegion("testSelectiveFlushWithEager", conf); 149 verifyInMemoryFlushSize(region); 150 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 151 for (int i = 1; i <= 1200; i++) { 152 region.put(createPut(1, i)); // compacted memstore, all the keys are unique 153 154 if (i <= 100) { 155 region.put(createPut(2, i)); 156 if (i <= 50) { 157 // compacted memstore, subject for compaction due to duplications 158 region.put(createDoublePut(3, i)); 159 } 160 } 161 } 162 163 // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk 164 for (int i = 100; i < 2000; i++) { 165 region.put(createPut(2, i)); 166 } 167 168 long totalMemstoreSize = region.getMemStoreDataSize(); 169 170 // Find the smallest LSNs for edits wrt to each CF. 171 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 172 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 173 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 174 175 // Find the sizes of the memstores of each CF. 176 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 177 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 178 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 179 180 // Get the overall smallest LSN in the region's memstores. 181 long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL 182 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 183 184 String s = "\n\n----------------------------------\n" 185 + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI 186 + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore() 187 + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" 188 + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI 189 + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n"; 190 191 // The overall smallest LSN in the region's memstores should be the same as 192 // the LSN of the smallest edit in CF1 193 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 194 195 // Some other sanity checks. 196 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 197 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 198 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 199 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 200 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 201 202 // The total memstore size should be the same as the sum of the sizes of 203 // memstores of CF1, CF2 and CF3. 204 String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI=" 205 + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI 206 + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; 207 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 208 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 209 210 // Flush!!!!!!!!!!!!!!!!!!!!!! 211 // We have big compacting memstore CF1 and two small memstores: 212 // CF2 (not compacted) and CF3 (compacting) 213 // All together they are above the flush size lower bound. 214 // Since CF1 and CF3 should be flushed to memory (not to disk), 215 // CF2 is going to be flushed to disk. 216 // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted 217 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 218 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 219 cms1.flushInMemory(); 220 cms3.flushInMemory(); 221 region.flush(false); 222 223 // Recalculate everything 224 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 225 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 226 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 227 228 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 229 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 230 // Find the smallest LSNs for edits wrt to each CF. 231 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 232 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 233 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 234 235 s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" 236 + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII 237 + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; 238 239 // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened 240 assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize()); 241 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 242 243 // CF2 should become empty 244 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 245 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 246 247 // verify that CF3 was flushed to memory and was compacted (this is approximation check) 248 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); 249 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize()); 250 251 // Now the smallest LSN in the region should be the same as the smallest 252 // LSN in the memstore of CF1. 253 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 254 255 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 256 // memory in next flush 257 for (int i = 1200; i < 3000; i++) { 258 region.put(createPut(1, i)); 259 } 260 261 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII 262 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " 263 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" 264 + smallestSeqCF3PhaseII + "\n"; 265 266 // How much does the CF1 memstore occupy? Will be used later. 267 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 268 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 269 270 s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII 271 + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"; 272 273 // Flush!!!!!!!!!!!!!!!!!!!!!! 274 // Flush again, CF1 is flushed to disk 275 // CF2 is flushed to disk, because it is not in-memory compacted memstore 276 // CF3 is flushed empty to memory (actually nothing happens to CF3) 277 region.flush(false); 278 279 // Recalculate everything 280 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 281 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 282 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 283 284 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 285 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 286 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 287 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 288 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 289 290 s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" 291 + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n"; 292 293 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 294 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 295 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 296 + smallestSeqCF3PhaseIV + "\n"; 297 298 // CF1's pipeline component (inserted before first flush) should be flushed to disk 299 // CF2 should be flushed to disk 300 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 301 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 302 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 303 304 // CF3 shouldn't have been touched. 305 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 306 307 // the smallest LSN of CF3 shouldn't change 308 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 309 310 // CF3 should be bottleneck for WAL 311 assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 312 313 // Flush!!!!!!!!!!!!!!!!!!!!!! 314 // Trying to clean the existing memstores, CF2 all flushed to disk. The single 315 // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. 316 region.flush(true); 317 318 // Recalculate everything 319 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 320 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 321 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 322 long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL 323 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 324 325 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 326 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 327 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 328 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 329 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 330 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 331 332 // What happens when we hit the memstore limit, but we are not able to find 333 // any Column Family above the threshold? 334 // In that case, we should flush all the CFs. 335 336 // The memstore limit is 100*1024 and the column family flush threshold is 337 // around 25*1024. We try to just hit the memstore limit with each CF's 338 // memstore being below the CF flush threshold. 339 for (int i = 1; i <= 300; i++) { 340 region.put(createPut(1, i)); 341 region.put(createPut(2, i)); 342 region.put(createPut(3, i)); 343 region.put(createPut(4, i)); 344 region.put(createPut(5, i)); 345 } 346 347 region.flush(false); 348 349 s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " 350 + smallestSeqInRegionCurrentMemstorePhaseV 351 + ". After additional inserts and last flush, the entire region size is:" 352 + region.getMemStoreDataSize() + "\n----------------------------------\n"; 353 354 // Since we won't find any CF above the threshold, and hence no specific 355 // store to flush, we should flush all the memstores 356 // Also compacted memstores are flushed to disk. 357 assertEquals(0, region.getMemStoreDataSize()); 358 System.out.println(s); 359 HBaseTestingUtil.closeRegionAndWAL(region); 360 } 361 362 /*------------------------------------------------------------------------------*/ 363 /* Check the same as above but for index-compaction type of compacting memstore */ 364 @Test 365 public void testSelectiveFlushWithIndexCompaction() throws IOException { 366 /*------------------------------------------------------------------------------*/ 367 /* SETUP */ 368 // Set up the configuration 369 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 370 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 371 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 372 // set memstore to index-compaction 373 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 374 String.valueOf(MemoryCompactionPolicy.BASIC)); 375 376 // Initialize the region 377 HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); 378 verifyInMemoryFlushSize(region); 379 /*------------------------------------------------------------------------------*/ 380 /* PHASE I - insertions */ 381 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 382 for (int i = 1; i <= 1200; i++) { 383 region.put(createPut(1, i)); // compacted memstore 384 if (i <= 100) { 385 region.put(createPut(2, i)); 386 if (i <= 50) { 387 region.put(createDoublePut(3, i)); // subject for in-memory compaction 388 } 389 } 390 } 391 // Now add more puts for CF2, so that we only flush CF2 to disk 392 for (int i = 100; i < 2000; i++) { 393 region.put(createPut(2, i)); 394 } 395 396 /*------------------------------------------------------------------------------*/ 397 /*------------------------------------------------------------------------------*/ 398 /* PHASE I - collect sizes */ 399 long totalMemstoreSizePhaseI = region.getMemStoreDataSize(); 400 // Find the smallest LSNs for edits wrt to each CF. 401 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 402 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 403 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 404 // Find the sizes of the memstores of each CF. 405 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 406 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 407 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 408 // Get the overall smallest LSN in the region's memstores. 409 long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL 410 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 411 412 /*------------------------------------------------------------------------------*/ 413 /* PHASE I - validation */ 414 // The overall smallest LSN in the region's memstores should be the same as 415 // the LSN of the smallest edit in CF1 416 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 417 // Some other sanity checks. 418 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 419 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 420 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 421 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 422 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 423 424 // The total memstore size should be the same as the sum of the sizes of 425 // memstores of CF1, CF2 and CF3. 426 assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() 427 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 428 429 /*------------------------------------------------------------------------------*/ 430 /* PHASE I - Flush */ 431 // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! 432 // CF1, CF2, CF3, all together they are above the flush size lower bound. 433 // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. 434 // CF1 and CF3 - flushed to memory and flatten explicitly 435 region.flush(false); 436 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 437 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 438 cms1.flushInMemory(); 439 cms3.flushInMemory(); 440 441 // CF3/CF1 should be merged so wait here to be sure the compaction is done 442 while ( 443 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 444 .isMemStoreFlushingInMemory() 445 ) { 446 Threads.sleep(10); 447 } 448 while ( 449 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 450 .isMemStoreFlushingInMemory() 451 ) { 452 Threads.sleep(10); 453 } 454 455 /*------------------------------------------------------------------------------*/ 456 /*------------------------------------------------------------------------------*/ 457 /* PHASE II - collect sizes */ 458 // Recalculate everything 459 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 460 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 461 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 462 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 463 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 464 // Find the smallest LSNs for edits wrt to each CF. 465 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 466 long totalMemstoreSizePhaseII = region.getMemStoreDataSize(); 467 468 /*------------------------------------------------------------------------------*/ 469 /* PHASE II - validation */ 470 // CF1 was flushed to memory, should be flattened and take less space 471 assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); 472 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 473 // CF2 should become empty 474 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 475 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 476 // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) 477 // if compacted CF# should be at least twice less because its every key was duplicated 478 assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); 479 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize()); 480 481 // Now the smallest LSN in the region should be the same as the smallest 482 // LSN in the memstore of CF1. 483 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 484 // The total memstore size should be the same as the sum of the sizes of 485 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 486 // items in CF1/2 487 assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() 488 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 489 490 /*------------------------------------------------------------------------------*/ 491 /*------------------------------------------------------------------------------*/ 492 /* PHASE III - insertions */ 493 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 494 // memory in next flush. This is causing the CF! to be flushed to memory twice. 495 for (int i = 1200; i < 8000; i++) { 496 region.put(createPut(1, i)); 497 } 498 499 // CF1 should be flatten and merged so wait here to be sure the compaction is done 500 while ( 501 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 502 .isMemStoreFlushingInMemory() 503 ) { 504 Threads.sleep(10); 505 } 506 507 /*------------------------------------------------------------------------------*/ 508 /* PHASE III - collect sizes */ 509 // How much does the CF1 memstore occupy now? Will be used later. 510 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 511 long totalMemstoreSizePhaseIII = region.getMemStoreDataSize(); 512 513 /*------------------------------------------------------------------------------*/ 514 /* PHASE III - validation */ 515 // The total memstore size should be the same as the sum of the sizes of 516 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 517 // items in CF1/2 518 assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() 519 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 520 521 /*------------------------------------------------------------------------------*/ 522 /* PHASE III - Flush */ 523 // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! 524 // CF1 is flushed to disk, but not entirely emptied. 525 // CF2 was and remained empty, same way nothing happens to CF3 526 region.flush(false); 527 528 /*------------------------------------------------------------------------------*/ 529 /*------------------------------------------------------------------------------*/ 530 /* PHASE IV - collect sizes */ 531 // Recalculate everything 532 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 533 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 534 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 535 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 536 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 537 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 538 539 /*------------------------------------------------------------------------------*/ 540 /* PHASE IV - validation */ 541 // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk 542 // CF2 should remain empty 543 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 544 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 545 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 546 // CF3 shouldn't have been touched. 547 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 548 // the smallest LSN of CF3 shouldn't change 549 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 550 // CF3 should be bottleneck for WAL 551 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 552 553 /*------------------------------------------------------------------------------*/ 554 /* PHASE IV - Flush */ 555 // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! 556 // Force flush to disk on all memstores (flush parameter true). 557 // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty 558 region.flush(true); 559 560 /*------------------------------------------------------------------------------*/ 561 /*------------------------------------------------------------------------------*/ 562 /* PHASE V - collect sizes */ 563 // Recalculate everything 564 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 565 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 566 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 567 long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL 568 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 569 long totalMemstoreSizePhaseV = region.getMemStoreDataSize(); 570 571 /*------------------------------------------------------------------------------*/ 572 /* PHASE V - validation */ 573 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 574 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 575 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 576 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 577 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 578 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 579 // The total memstores size should be empty 580 assertEquals(0, totalMemstoreSizePhaseV); 581 // Because there is nothing in any memstore the WAL's LSN should be -1 582 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV); 583 584 // What happens when we hit the memstore limit, but we are not able to find 585 // any Column Family above the threshold? 586 // In that case, we should flush all the CFs. 587 588 /*------------------------------------------------------------------------------*/ 589 /*------------------------------------------------------------------------------*/ 590 /* PHASE VI - insertions */ 591 // The memstore limit is 200*1024 and the column family flush threshold is 592 // around 50*1024. We try to just hit the memstore limit with each CF's 593 // memstore being below the CF flush threshold. 594 for (int i = 1; i <= 300; i++) { 595 region.put(createPut(1, i)); 596 region.put(createPut(2, i)); 597 region.put(createPut(3, i)); 598 region.put(createPut(4, i)); 599 region.put(createPut(5, i)); 600 } 601 602 MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); 603 MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); 604 MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); 605 606 /*------------------------------------------------------------------------------*/ 607 /* PHASE VI - Flush */ 608 // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! 609 // None among compacting memstores was flushed to memory due to previous puts. 610 // But is going to be moved to pipeline and flatten due to the flush. 611 region.flush(false); 612 // Since we won't find any CF above the threshold, and hence no specific 613 // store to flush, we should flush all the memstores 614 // Also compacted memstores are flushed to disk, but not entirely emptied 615 MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); 616 MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); 617 MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); 618 619 assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize()); 620 assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize()); 621 assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize()); 622 623 HBaseTestingUtil.closeRegionAndWAL(region); 624 } 625 626 @Test 627 public void testSelectiveFlushAndWALinDataCompaction() throws IOException { 628 // Set up the configuration 629 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 630 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 631 // set memstore to do data compaction and not to use the speculative scan 632 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 633 String.valueOf(MemoryCompactionPolicy.EAGER)); 634 635 // Intialize the HRegion 636 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 637 verifyInMemoryFlushSize(region); 638 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 639 for (int i = 1; i <= 1200; i++) { 640 region.put(createPut(1, i)); 641 if (i <= 100) { 642 region.put(createPut(2, i)); 643 if (i <= 50) { 644 region.put(createPut(3, i)); 645 } 646 } 647 } 648 // Now add more puts for CF2, so that we only flush CF2 to disk 649 for (int i = 100; i < 2000; i++) { 650 region.put(createPut(2, i)); 651 } 652 653 // in this test check the non-composite snapshot - flashing only tail of the pipeline 654 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false); 655 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false); 656 657 long totalMemstoreSize = region.getMemStoreDataSize(); 658 659 // Find the sizes of the memstores of each CF. 660 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 661 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 662 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 663 664 // Some other sanity checks. 665 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 666 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 667 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 668 669 // The total memstore size should be the same as the sum of the sizes of 670 // memstores of CF1, CF2 and CF3. 671 String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD=" 672 + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI 673 + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI=" 674 + cf3MemstoreSizePhaseI; 675 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 676 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 677 678 // Flush! 679 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 680 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 681 cms1.flushInMemory(); 682 cms3.flushInMemory(); 683 region.flush(false); 684 685 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 686 687 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 688 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 689 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 690 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 691 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 692 693 // CF2 should have been cleared 694 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 695 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 696 697 String s = "\n\n----------------------------------\n" 698 + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:" 699 + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII 700 + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; 701 702 // Add same entries to compact them later 703 for (int i = 1; i <= 1200; i++) { 704 region.put(createPut(1, i)); 705 if (i <= 100) { 706 region.put(createPut(2, i)); 707 if (i <= 50) { 708 region.put(createPut(3, i)); 709 } 710 } 711 } 712 // Now add more puts for CF2, so that we only flush CF2 to disk 713 for (int i = 100; i < 2000; i++) { 714 region.put(createPut(2, i)); 715 } 716 717 long smallestSeqInRegionCurrentMemstorePhaseIII = AbstractTestFSWAL 718 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 719 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 720 long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); 721 long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); 722 723 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII 724 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " 725 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" 726 + smallestSeqCF3PhaseIII + "\n"; 727 728 // Flush! 729 cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 730 cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 731 cms1.flushInMemory(); 732 cms3.flushInMemory(); 733 region.flush(false); 734 735 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 736 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 737 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 738 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 739 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 740 741 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 742 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 743 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 744 + smallestSeqCF3PhaseIV + "\n"; 745 746 // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction 747 assertTrue(s, 748 smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); 749 assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); 750 assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); 751 752 HBaseTestingUtil.closeRegionAndWAL(region); 753 } 754 755 @Test 756 public void testSelectiveFlushWithBasicAndMerge() throws IOException { 757 // Set up the configuration 758 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 759 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 760 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8); 761 // set memstore to do index compaction with merge 762 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 763 String.valueOf(MemoryCompactionPolicy.BASIC)); 764 // length of pipeline that requires merge 765 conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); 766 767 // Intialize the HRegion 768 HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); 769 verifyInMemoryFlushSize(region); 770 // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 771 for (int i = 1; i <= 1200; i++) { 772 region.put(createPut(1, i)); 773 if (i <= 100) { 774 region.put(createPut(2, i)); 775 if (i <= 50) { 776 region.put(createPut(3, i)); 777 } 778 } 779 } 780 // Now put more entries to CF2 781 for (int i = 100; i < 2000; i++) { 782 region.put(createPut(2, i)); 783 } 784 785 long totalMemstoreSize = region.getMemStoreDataSize(); 786 787 // test in-memory flashing into CAM here 788 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 789 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 790 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 791 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 792 793 // Find the sizes of the memstores of each CF. 794 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 795 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 796 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 797 798 // Some other sanity checks. 799 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 800 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 801 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 802 803 // The total memstore size should be the same as the sum of the sizes of 804 // memstores of CF1, CF2 and CF3. 805 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 806 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 807 808 // Initiate in-memory Flush! 809 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 810 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 811 // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done 812 while ( 813 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 814 .isMemStoreFlushingInMemory() 815 ) { 816 Threads.sleep(10); 817 } 818 while ( 819 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 820 .isMemStoreFlushingInMemory() 821 ) { 822 Threads.sleep(10); 823 } 824 825 // Flush-to-disk! CF2 only should be flushed 826 region.flush(false); 827 828 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 829 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 830 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 831 832 // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller 833 assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); 834 // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same 835 assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); 836 // CF2 should have been cleared 837 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 838 839 // Add the same amount of entries to see the merging 840 for (int i = 1; i <= 1200; i++) { 841 region.put(createPut(1, i)); 842 if (i <= 100) { 843 region.put(createPut(2, i)); 844 if (i <= 50) { 845 region.put(createPut(3, i)); 846 } 847 } 848 } 849 // Now add more puts for CF2, so that we only flush CF2 to disk 850 for (int i = 100; i < 2000; i++) { 851 region.put(createPut(2, i)); 852 } 853 854 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 855 856 // Flush in memory! 857 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 858 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 859 // CF1 and CF3 should be merged so wait here to be sure the merge is done 860 while ( 861 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 862 .isMemStoreFlushingInMemory() 863 ) { 864 Threads.sleep(10); 865 } 866 while ( 867 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 868 .isMemStoreFlushingInMemory() 869 ) { 870 Threads.sleep(10); 871 } 872 region.flush(false); 873 874 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 875 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 876 877 assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); 878 // the decrease in the heap size due to usage of CellArrayMap instead of CSLM 879 // should be the same in flattening and in merge (first and second in-memory-flush) 880 // but in phase 1 we do not yet have immutable segment 881 assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), 882 cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize() 883 - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 884 assertEquals(3, // active, one in pipeline, snapshot 885 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size()); 886 // CF2 should have been cleared 887 assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," 888 + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize() 889 + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize() 890 + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize() 891 + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize() 892 + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize() 893 + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize() 894 + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() 895 + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() 896 + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize()); 897 898 HBaseTestingUtil.closeRegionAndWAL(region); 899 } 900 901 // should end in 300 seconds (5 minutes) 902 @Test 903 public void testStressFlushAndWALinIndexCompaction() throws IOException { 904 // Set up the configuration 905 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); 906 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 907 200 * 1024); 908 // set memstore to do data compaction and not to use the speculative scan 909 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 910 String.valueOf(MemoryCompactionPolicy.BASIC)); 911 912 // Successfully initialize the HRegion 913 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 914 verifyInMemoryFlushSize(region); 915 Thread[] threads = new Thread[25]; 916 for (int i = 0; i < threads.length; i++) { 917 int id = i * 10000; 918 ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); 919 threads[i] = new Thread(runnable); 920 threads[i].start(); 921 } 922 Threads.sleep(10000); // let other threads start 923 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 924 Threads.sleep(10000); // let other threads continue 925 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 926 927 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 928 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 929 while ( 930 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 931 .isMemStoreFlushingInMemory() 932 ) { 933 Threads.sleep(10); 934 } 935 while ( 936 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 937 .isMemStoreFlushingInMemory() 938 ) { 939 Threads.sleep(10); 940 } 941 942 for (int i = 0; i < threads.length; i++) { 943 try { 944 threads[i].join(); 945 } catch (InterruptedException e) { 946 e.printStackTrace(); 947 } 948 } 949 } 950 951 /** 952 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 953 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 954 * releases updatesLock and compacts the pipeline. 955 */ 956 private class ConcurrentPutRunnable implements Runnable { 957 private final HRegion stressedRegion; 958 private final int startNumber; 959 960 ConcurrentPutRunnable(HRegion r, int i) { 961 this.stressedRegion = r; 962 this.startNumber = i; 963 } 964 965 @Override 966 public void run() { 967 968 try { 969 int dummy = startNumber / 10000; 970 System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); 971 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 972 for (int i = startNumber; i <= startNumber + 3000; i++) { 973 stressedRegion.put(createPut(1, i)); 974 if (i <= startNumber + 2000) { 975 stressedRegion.put(createPut(2, i)); 976 if (i <= startNumber + 1000) { 977 stressedRegion.put(createPut(3, i)); 978 } 979 } 980 } 981 System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); 982 // Now add more puts for CF2, so that we only flush CF2 to disk 983 for (int i = startNumber + 3000; i < startNumber + 5000; i++) { 984 stressedRegion.put(createPut(2, i)); 985 } 986 // And add more puts for CF1 987 for (int i = startNumber + 5000; i < startNumber + 7000; i++) { 988 stressedRegion.put(createPut(1, i)); 989 } 990 System.out.print("Thread with start number " + startNumber + " flushes\n"); 991 // flush (IN MEMORY) one of the stores (each thread flushes different store) 992 // and wait till the flush and the following action are done 993 if (startNumber == 0) { 994 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 995 .flushInMemory(); 996 while ( 997 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 998 .isMemStoreFlushingInMemory() 999 ) { 1000 Threads.sleep(10); 1001 } 1002 } 1003 if (startNumber == 10000) { 1004 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1005 .flushInMemory(); 1006 while ( 1007 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1008 .isMemStoreFlushingInMemory() 1009 ) { 1010 Threads.sleep(10); 1011 } 1012 } 1013 if (startNumber == 20000) { 1014 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1015 .flushInMemory(); 1016 while ( 1017 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1018 .isMemStoreFlushingInMemory() 1019 ) { 1020 Threads.sleep(10); 1021 } 1022 } 1023 System.out.print("Thread with start number " + startNumber + " finishes\n"); 1024 } catch (IOException e) { 1025 assert false; 1026 } 1027 } 1028 } 1029 1030 private WAL getWAL(Region region) { 1031 return ((HRegion) region).getWAL(); 1032 } 1033}