001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeepDeletedCells; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueTestUtil; 044import org.apache.hadoop.hbase.MemoryCompactionPolicy; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 054import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.testclassification.RegionServerTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdge; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.junit.After; 063import org.junit.Before; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.Mockito; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * compacted memstore test case 073 */ 074@Category({ RegionServerTests.class, MediumTests.class }) 075public class TestCompactingMemStore extends TestDefaultMemStore { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 082 protected static ChunkCreator chunkCreator; 083 protected HRegion region; 084 protected RegionServicesForStores regionServicesForStores; 085 protected HStore store; 086 087 ////////////////////////////////////////////////////////////////////////////// 088 // Helpers 089 ////////////////////////////////////////////////////////////////////////////// 090 protected static byte[] makeQualifier(final int i1, final int i2) { 091 return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); 092 } 093 094 @After 095 public void tearDown() throws Exception { 096 chunkCreator.clearChunksInPool(); 097 } 098 099 @Override 100 @Before 101 public void setUp() throws Exception { 102 compactingSetUp(); 103 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), 104 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 105 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 106 } 107 108 protected void compactingSetUp() throws Exception { 109 super.internalSetUp(); 110 Configuration conf = new Configuration(); 111 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 112 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 113 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 114 HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); 115 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY); 116 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar")) 117 .setColumnFamily(familyDescriptor).build(); 118 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build(); 119 WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info); 120 this.region = 121 HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true); 122 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 123 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 124 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 125 this.store = new HStore(region, familyDescriptor, conf, false); 126 127 long globalMemStoreLimit = 128 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 129 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 130 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 131 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 132 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 133 assertNotNull(chunkCreator); 134 } 135 136 /** 137 * A simple test which flush in memory affect timeOfOldestEdit 138 */ 139 @Test 140 public void testTimeOfOldestEdit() { 141 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 142 final byte[] r = Bytes.toBytes("r"); 143 final byte[] f = Bytes.toBytes("f"); 144 final byte[] q = Bytes.toBytes("q"); 145 final byte[] v = Bytes.toBytes("v"); 146 final KeyValue kv = new KeyValue(r, f, q, v); 147 memstore.add(kv, null); 148 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 149 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 150 151 ((CompactingMemStore) memstore).flushInMemory(); 152 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 153 memstore.add(kv, null); 154 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 155 memstore.snapshot(); 156 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 157 } 158 159 /** 160 * A simple test which verifies the 3 possible states when scanning across snapshot. 161 */ 162 @Override 163 @Test 164 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 165 // we are going to the scanning across snapshot with two kvs 166 // kv1 should always be returned before kv2 167 final byte[] one = Bytes.toBytes(1); 168 final byte[] two = Bytes.toBytes(2); 169 final byte[] f = Bytes.toBytes("f"); 170 final byte[] q = Bytes.toBytes("q"); 171 final byte[] v = Bytes.toBytes(3); 172 173 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 174 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 175 176 // use case 1: both kvs in kvset 177 this.memstore.add(kv1.clone(), null); 178 this.memstore.add(kv2.clone(), null); 179 // snapshot is empty,active segment is not empty, 180 // empty segment is skipped. 181 verifyOneScanAcrossSnapshot2(kv1, kv2); 182 183 // use case 2: both kvs in snapshot 184 this.memstore.snapshot(); 185 // active segment is empty,snapshot is not empty, 186 // empty segment is skipped. 187 verifyOneScanAcrossSnapshot2(kv1, kv2); 188 189 // use case 3: first in snapshot second in kvset 190 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 191 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 192 this.memstore.add(kv1.clone(), null); 193 // As compaction is starting in the background the repetition 194 // of the k1 might be removed BUT the scanners created earlier 195 // should look on the OLD MutableCellSetSegment, so this should be OK... 196 this.memstore.snapshot(); 197 this.memstore.add(kv2.clone(), null); 198 verifyScanAcrossSnapshot2(kv1, kv2); 199 } 200 201 /** 202 * Test memstore snapshots 203 */ 204 @Override 205 @Test 206 public void testSnapshotting() throws IOException { 207 final int snapshotCount = 5; 208 // Add some rows, run a snapshot. Do it a few times. 209 for (int i = 0; i < snapshotCount; i++) { 210 addRows(this.memstore); 211 runSnapshot(this.memstore, true); 212 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 213 } 214 } 215 216 ////////////////////////////////////////////////////////////////////////////// 217 // Get tests 218 ////////////////////////////////////////////////////////////////////////////// 219 220 /** 221 * Test getNextRow from memstore 222 */ 223 @Override 224 @Test 225 public void testGetNextRow() throws Exception { 226 addRows(this.memstore); 227 // Add more versions to make it a little more interesting. 228 Thread.sleep(1); 229 addRows(this.memstore); 230 Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); 231 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 232 new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0); 233 for (int i = 0; i < ROW_COUNT; i++) { 234 Cell nr = ((CompactingMemStore) this.memstore) 235 .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime())); 236 if (i + 1 == ROW_COUNT) { 237 assertNull(nr); 238 } else { 239 assertTrue(CellComparator.getInstance().compareRows(nr, 240 new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0); 241 } 242 } 243 // starting from each row, validate results should contain the starting row 244 Configuration conf = HBaseConfiguration.create(); 245 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 246 ScanInfo scanInfo = 247 new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, 248 HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 249 try (InternalScanner scanner = 250 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 251 memstore.getScanners(0))) { 252 List<Cell> results = new ArrayList<>(); 253 for (int i = 0; scanner.next(results); i++) { 254 int rowId = startRowId + i; 255 Cell left = results.get(0); 256 byte[] row1 = Bytes.toBytes(rowId); 257 assertTrue("Row name", 258 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 259 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 260 List<Cell> row = new ArrayList<>(); 261 for (Cell kv : results) { 262 row.add(kv); 263 } 264 isExpectedRowWithoutTimestamps(rowId, row); 265 // Clear out set. Otherwise row results accumulate. 266 results.clear(); 267 } 268 } 269 } 270 } 271 272 @Override 273 @Test 274 public void testGet_memstoreAndSnapShot() throws IOException { 275 byte[] row = Bytes.toBytes("testrow"); 276 byte[] fam = Bytes.toBytes("testfamily"); 277 byte[] qf1 = Bytes.toBytes("testqualifier1"); 278 byte[] qf2 = Bytes.toBytes("testqualifier2"); 279 byte[] qf3 = Bytes.toBytes("testqualifier3"); 280 byte[] qf4 = Bytes.toBytes("testqualifier4"); 281 byte[] qf5 = Bytes.toBytes("testqualifier5"); 282 byte[] val = Bytes.toBytes("testval"); 283 284 // Setting up memstore 285 memstore.add(new KeyValue(row, fam, qf1, val), null); 286 memstore.add(new KeyValue(row, fam, qf2, val), null); 287 memstore.add(new KeyValue(row, fam, qf3, val), null); 288 // Pushing to pipeline 289 ((CompactingMemStore) memstore).flushInMemory(); 290 assertEquals(0, memstore.getSnapshot().getCellsCount()); 291 // Creating a snapshot 292 memstore.snapshot(); 293 assertEquals(3, memstore.getSnapshot().getCellsCount()); 294 // Adding value to "new" memstore 295 assertEquals(0, memstore.getActive().getCellsCount()); 296 memstore.add(new KeyValue(row, fam, qf4, val), null); 297 memstore.add(new KeyValue(row, fam, qf5, val), null); 298 assertEquals(2, memstore.getActive().getCellsCount()); 299 } 300 301 //////////////////////////////////// 302 // Test for periodic memstore flushes 303 // based on time of oldest edit 304 //////////////////////////////////// 305 306 /** 307 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older 308 * keyvalues are deleted from the memstore. 309 */ 310 @Override 311 @Test 312 public void testUpsertMemstoreSize() throws Exception { 313 MemStoreSize oldSize = memstore.size(); 314 315 List<ExtendedCell> l = new ArrayList<>(); 316 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 317 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 318 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 319 320 kv1.setSequenceId(1); 321 kv2.setSequenceId(1); 322 kv3.setSequenceId(1); 323 l.add(kv1); 324 l.add(kv2); 325 l.add(kv3); 326 327 this.memstore.upsert(l, 2, null);// readpoint is 2 328 MemStoreSize newSize = this.memstore.size(); 329 assert (newSize.getDataSize() > oldSize.getDataSize()); 330 // The kv1 should be removed. 331 assert (memstore.getActive().getCellsCount() == 2); 332 333 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 334 kv4.setSequenceId(1); 335 l.clear(); 336 l.add(kv4); 337 this.memstore.upsert(l, 3, null); 338 assertEquals(newSize, this.memstore.size()); 339 // The kv2 should be removed. 340 assert (memstore.getActive().getCellsCount() == 2); 341 // this.memstore = null; 342 } 343 344 /** 345 * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in 346 * memstore. 347 */ 348 @Override 349 @Test 350 public void testUpdateToTimeOfOldestEdit() throws Exception { 351 try { 352 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 353 EnvironmentEdgeManager.injectEdge(edge); 354 long t = memstore.timeOfOldestEdit(); 355 assertEquals(Long.MAX_VALUE, t); 356 357 // test the case that the timeOfOldestEdit is updated after a KV add 358 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 359 t = memstore.timeOfOldestEdit(); 360 assertTrue(t == 1234); 361 // The method will also assert 362 // the value is reset to Long.MAX_VALUE 363 t = runSnapshot(memstore, true); 364 365 // test the case that the timeOfOldestEdit is updated after a KV delete 366 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 367 t = memstore.timeOfOldestEdit(); 368 assertTrue(t == 1234); 369 t = runSnapshot(memstore, true); 370 371 // test the case that the timeOfOldestEdit is updated after a KV upsert 372 List<ExtendedCell> l = new ArrayList<>(); 373 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 374 kv1.setSequenceId(100); 375 l.add(kv1); 376 memstore.upsert(l, 1000, null); 377 t = memstore.timeOfOldestEdit(); 378 assertTrue(t == 1234); 379 } finally { 380 EnvironmentEdgeManager.reset(); 381 } 382 } 383 384 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { 385 // Save off old state. 386 long oldHistorySize = hmc.getSnapshot().getDataSize(); 387 long prevTimeStamp = hmc.timeOfOldestEdit(); 388 389 hmc.snapshot(); 390 MemStoreSnapshot snapshot = hmc.snapshot(); 391 if (useForce) { 392 // Make some assertions about what just happened. 393 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 394 long t = hmc.timeOfOldestEdit(); 395 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 396 hmc.clearSnapshot(snapshot.getId()); 397 } else { 398 long t = hmc.timeOfOldestEdit(); 399 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 400 } 401 return prevTimeStamp; 402 } 403 404 private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) { 405 int i = 0; 406 for (Cell kv : kvs) { 407 byte[] expectedColname = makeQualifier(rowIndex, i++); 408 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 409 // Value is column name as bytes. Usually result is 410 // 100 bytes in size at least. This is the default size 411 // for BytesWriteable. For comparison, convert bytes to 412 // String and trim to remove trailing null bytes. 413 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 414 } 415 } 416 417 @Test 418 public void testPuttingBackChunksAfterFlushing() throws IOException { 419 byte[] row = Bytes.toBytes("testrow"); 420 byte[] fam = Bytes.toBytes("testfamily"); 421 byte[] qf1 = Bytes.toBytes("testqualifier1"); 422 byte[] qf2 = Bytes.toBytes("testqualifier2"); 423 byte[] qf3 = Bytes.toBytes("testqualifier3"); 424 byte[] qf4 = Bytes.toBytes("testqualifier4"); 425 byte[] qf5 = Bytes.toBytes("testqualifier5"); 426 byte[] val = Bytes.toBytes("testval"); 427 428 // Setting up memstore 429 memstore.add(new KeyValue(row, fam, qf1, val), null); 430 memstore.add(new KeyValue(row, fam, qf2, val), null); 431 memstore.add(new KeyValue(row, fam, qf3, val), null); 432 433 // Creating a snapshot 434 MemStoreSnapshot snapshot = memstore.snapshot(); 435 assertEquals(3, memstore.getSnapshot().getCellsCount()); 436 437 // Adding value to "new" memstore 438 assertEquals(0, memstore.getActive().getCellsCount()); 439 memstore.add(new KeyValue(row, fam, qf4, val), null); 440 memstore.add(new KeyValue(row, fam, qf5, val), null); 441 assertEquals(2, memstore.getActive().getCellsCount()); 442 // close the scanners 443 for (KeyValueScanner scanner : snapshot.getScanners()) { 444 scanner.close(); 445 } 446 memstore.clearSnapshot(snapshot.getId()); 447 448 int chunkCount = chunkCreator.getPoolSize(); 449 assertTrue(chunkCount > 0); 450 451 } 452 453 @Test 454 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 455 byte[] row = Bytes.toBytes("testrow"); 456 byte[] fam = Bytes.toBytes("testfamily"); 457 byte[] qf1 = Bytes.toBytes("testqualifier1"); 458 byte[] qf2 = Bytes.toBytes("testqualifier2"); 459 byte[] qf3 = Bytes.toBytes("testqualifier3"); 460 byte[] qf4 = Bytes.toBytes("testqualifier4"); 461 byte[] qf5 = Bytes.toBytes("testqualifier5"); 462 byte[] qf6 = Bytes.toBytes("testqualifier6"); 463 byte[] qf7 = Bytes.toBytes("testqualifier7"); 464 byte[] val = Bytes.toBytes("testval"); 465 466 // Setting up memstore 467 memstore.add(new KeyValue(row, fam, qf1, val), null); 468 memstore.add(new KeyValue(row, fam, qf2, val), null); 469 memstore.add(new KeyValue(row, fam, qf3, val), null); 470 471 // Creating a snapshot 472 MemStoreSnapshot snapshot = memstore.snapshot(); 473 assertEquals(3, memstore.getSnapshot().getCellsCount()); 474 475 // Adding value to "new" memstore 476 assertEquals(0, memstore.getActive().getCellsCount()); 477 memstore.add(new KeyValue(row, fam, qf4, val), null); 478 memstore.add(new KeyValue(row, fam, qf5, val), null); 479 assertEquals(2, memstore.getActive().getCellsCount()); 480 481 // opening scanner before clear the snapshot 482 List<KeyValueScanner> scanners = memstore.getScanners(0); 483 // Shouldn't putting back the chunks to pool,since some scanners are opening 484 // based on their data 485 // close the scanners 486 for (KeyValueScanner scanner : snapshot.getScanners()) { 487 scanner.close(); 488 } 489 memstore.clearSnapshot(snapshot.getId()); 490 491 assertTrue(chunkCreator.getPoolSize() == 0); 492 493 // Chunks will be put back to pool after close scanners; 494 for (KeyValueScanner scanner : scanners) { 495 scanner.close(); 496 } 497 assertTrue(chunkCreator.getPoolSize() > 0); 498 499 // clear chunks 500 chunkCreator.clearChunksInPool(); 501 502 // Creating another snapshot 503 504 snapshot = memstore.snapshot(); 505 // Adding more value 506 memstore.add(new KeyValue(row, fam, qf6, val), null); 507 memstore.add(new KeyValue(row, fam, qf7, val), null); 508 // opening scanners 509 scanners = memstore.getScanners(0); 510 // close scanners before clear the snapshot 511 for (KeyValueScanner scanner : scanners) { 512 scanner.close(); 513 } 514 // Since no opening scanner, the chunks of snapshot should be put back to 515 // pool 516 // close the scanners 517 for (KeyValueScanner scanner : snapshot.getScanners()) { 518 scanner.close(); 519 } 520 memstore.clearSnapshot(snapshot.getId()); 521 assertTrue(chunkCreator.getPoolSize() > 0); 522 } 523 524 @Test 525 public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { 526 527 // set memstore to do data compaction and not to use the speculative scan 528 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 529 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 530 String.valueOf(compactionType)); 531 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 532 533 byte[] row = Bytes.toBytes("testrow"); 534 byte[] fam = Bytes.toBytes("testfamily"); 535 byte[] qf1 = Bytes.toBytes("testqualifier1"); 536 byte[] qf2 = Bytes.toBytes("testqualifier2"); 537 byte[] qf3 = Bytes.toBytes("testqualifier3"); 538 byte[] val = Bytes.toBytes("testval"); 539 540 // Setting up memstore 541 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 542 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 543 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 544 545 // Creating a pipeline 546 ((MyCompactingMemStore) memstore).disableCompaction(); 547 ((CompactingMemStore) memstore).flushInMemory(); 548 549 // Adding value to "new" memstore 550 assertEquals(0, memstore.getActive().getCellsCount()); 551 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 552 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 553 assertEquals(2, memstore.getActive().getCellsCount()); 554 555 // pipeline bucket 2 556 ((CompactingMemStore) memstore).flushInMemory(); 557 // opening scanner before force flushing 558 List<KeyValueScanner> scanners = memstore.getScanners(0); 559 // Shouldn't putting back the chunks to pool,since some scanners are opening 560 // based on their data 561 ((MyCompactingMemStore) memstore).enableCompaction(); 562 // trigger compaction 563 ((CompactingMemStore) memstore).flushInMemory(); 564 565 // Adding value to "new" memstore 566 assertEquals(0, memstore.getActive().getCellsCount()); 567 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 568 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 569 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 570 assertEquals(3, memstore.getActive().getCellsCount()); 571 572 assertTrue(chunkCreator.getPoolSize() == 0); 573 574 // Chunks will be put back to pool after close scanners; 575 for (KeyValueScanner scanner : scanners) { 576 scanner.close(); 577 } 578 assertTrue(chunkCreator.getPoolSize() > 0); 579 580 // clear chunks 581 chunkCreator.clearChunksInPool(); 582 583 // Creating another snapshot 584 585 MemStoreSnapshot snapshot = memstore.snapshot(); 586 // close the scanners 587 for (KeyValueScanner scanner : snapshot.getScanners()) { 588 scanner.close(); 589 } 590 memstore.clearSnapshot(snapshot.getId()); 591 592 snapshot = memstore.snapshot(); 593 // Adding more value 594 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 595 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 596 // opening scanners 597 scanners = memstore.getScanners(0); 598 // close scanners before clear the snapshot 599 for (KeyValueScanner scanner : scanners) { 600 scanner.close(); 601 } 602 // Since no opening scanner, the chunks of snapshot should be put back to 603 // pool 604 // close the scanners 605 for (KeyValueScanner scanner : snapshot.getScanners()) { 606 scanner.close(); 607 } 608 memstore.clearSnapshot(snapshot.getId()); 609 assertTrue(chunkCreator.getPoolSize() > 0); 610 } 611 612 ////////////////////////////////////////////////////////////////////////////// 613 // Compaction tests 614 ////////////////////////////////////////////////////////////////////////////// 615 @Test 616 public void testCompaction1Bucket() throws IOException { 617 618 // set memstore to do basic structure flattening, the "eager" option is tested in 619 // TestCompactingToCellFlatMapMemStore 620 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 621 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 622 String.valueOf(compactionType)); 623 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 624 625 String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4 626 627 // test 1 bucket 628 int totalCellsLen = addRowsByKeys(memstore, keys1); 629 int oneCellOnCSLMHeapSize = 120; 630 int oneCellOnCAHeapSize = 88; 631 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 632 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 633 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 634 635 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 636 assertEquals(0, memstore.getSnapshot().getCellsCount()); 637 // There is no compaction, as the compacting memstore type is basic. 638 // totalCellsLen remains the same 639 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 640 + 4 * oneCellOnCAHeapSize; 641 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 642 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 643 644 MemStoreSize mss = memstore.getFlushableSize(); 645 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 646 // simulate flusher 647 region.decrMemStoreSize(mss); 648 ImmutableSegment s = memstore.getSnapshot(); 649 assertEquals(4, s.getCellsCount()); 650 assertEquals(0, regionServicesForStores.getMemStoreSize()); 651 652 memstore.clearSnapshot(snapshot.getId()); 653 } 654 655 @Test 656 public void testCompaction2Buckets() throws IOException { 657 658 // set memstore to do basic structure flattening, the "eager" option is tested in 659 // TestCompactingToCellFlatMapMemStore 660 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 661 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 662 String.valueOf(compactionType)); 663 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 664 String.valueOf(1)); 665 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 666 String[] keys1 = { "A", "A", "B", "C" }; 667 String[] keys2 = { "A", "B", "D" }; 668 669 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 670 int oneCellOnCSLMHeapSize = 120; 671 int oneCellOnCAHeapSize = 88; 672 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 673 674 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 675 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 676 677 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 678 int counter = 0; 679 for (Segment s : memstore.getSegments()) { 680 counter += s.getCellsCount(); 681 } 682 assertEquals(4, counter); 683 assertEquals(0, memstore.getSnapshot().getCellsCount()); 684 // There is no compaction, as the compacting memstore type is basic. 685 // totalCellsLen remains the same 686 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 687 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 688 + 4 * oneCellOnCAHeapSize; 689 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 690 691 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 692 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 693 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 694 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 695 696 MemStoreSize mss = memstore.getFlushableSize(); 697 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 698 assertEquals(0, memstore.getSnapshot().getCellsCount()); 699 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 700 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 701 + 7 * oneCellOnCAHeapSize; 702 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 703 704 mss = memstore.getFlushableSize(); 705 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 706 // simulate flusher 707 region.decrMemStoreSize(mss); 708 ImmutableSegment s = memstore.getSnapshot(); 709 assertEquals(7, s.getCellsCount()); 710 assertEquals(0, regionServicesForStores.getMemStoreSize()); 711 712 memstore.clearSnapshot(snapshot.getId()); 713 } 714 715 @Test 716 public void testCompaction3Buckets() throws IOException { 717 718 // set memstore to do data compaction and not to use the speculative scan 719 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 720 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 721 String.valueOf(compactionType)); 722 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 723 String[] keys1 = { "A", "A", "B", "C" }; 724 String[] keys2 = { "A", "B", "D" }; 725 String[] keys3 = { "D", "B", "B" }; 726 727 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 728 int oneCellOnCSLMHeapSize = 120; 729 int oneCellOnCAHeapSize = 88; 730 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 731 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 732 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 733 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 734 735 assertEquals(0, memstore.getSnapshot().getCellsCount()); 736 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 737 // totalCellsLen 738 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 739 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 740 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 741 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 742 + 3 * oneCellOnCAHeapSize; 743 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 744 745 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 746 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 747 748 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 749 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 750 751 ((MyCompactingMemStore) memstore).disableCompaction(); 752 MemStoreSize mss = memstore.getFlushableSize(); 753 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 754 assertEquals(0, memstore.getSnapshot().getCellsCount()); 755 // No change in the cells data size. ie. memstore size. as there is no compaction. 756 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 757 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 758 ((CompactingMemStore) memstore).heapSize()); 759 760 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 761 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 762 regionServicesForStores.getMemStoreSize()); 763 long totalHeapSize3 = 764 totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize; 765 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 766 767 ((MyCompactingMemStore) memstore).enableCompaction(); 768 mss = memstore.getFlushableSize(); 769 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 770 assertEquals(0, memstore.getSnapshot().getCellsCount()); 771 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 772 // Out of total 10, only 4 cells are unique 773 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 774 totalCellsLen3 = 0;// All duplicated cells. 775 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 776 regionServicesForStores.getMemStoreSize()); 777 // Only 4 unique cells left 778 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 779 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 780 781 mss = memstore.getFlushableSize(); 782 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 783 // simulate flusher 784 region.decrMemStoreSize(mss); 785 ImmutableSegment s = memstore.getSnapshot(); 786 assertEquals(4, s.getCellsCount()); 787 assertEquals(0, regionServicesForStores.getMemStoreSize()); 788 789 memstore.clearSnapshot(snapshot.getId()); 790 } 791 792 @Test 793 public void testMagicCompaction3Buckets() throws IOException { 794 795 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 796 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 797 String.valueOf(compactionType)); 798 memstore.getConfiguration() 799 .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 800 memstore.getConfiguration() 801 .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 802 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 803 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 804 805 String[] keys1 = { "A", "B", "D" }; 806 String[] keys2 = { "A" }; 807 String[] keys3 = { "A", "A", "B", "C" }; 808 String[] keys4 = { "D", "B", "B" }; 809 810 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 811 int oneCellOnCSLMHeapSize = 120; 812 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 813 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 814 assertEquals(totalHeapSize, memstore.heapSize()); 815 816 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten 817 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 818 assertEquals(1.0, 819 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 820 assertEquals(0, memstore.getSnapshot().getCellsCount()); 821 822 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 823 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 824 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 825 assertEquals(1.0, 826 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 827 assertEquals(0, memstore.getSnapshot().getCellsCount()); 828 829 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 830 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 831 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 832 assertEquals((4.0 / 8.0), 833 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 834 assertEquals(0, memstore.getSnapshot().getCellsCount()); 835 836 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 837 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 838 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 839 assertTrue(4 == numCells || 11 == numCells); 840 assertEquals(0, memstore.getSnapshot().getCellsCount()); 841 842 MemStoreSize mss = memstore.getFlushableSize(); 843 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 844 // simulate flusher 845 region.decrMemStoreSize(mss); 846 ImmutableSegment s = memstore.getSnapshot(); 847 numCells = s.getCellsCount(); 848 assertTrue(4 == numCells || 11 == numCells); 849 assertEquals(0, regionServicesForStores.getMemStoreSize()); 850 851 memstore.clearSnapshot(snapshot.getId()); 852 } 853 854 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 855 byte[] fam = Bytes.toBytes("testfamily"); 856 byte[] qf = Bytes.toBytes("testqualifier"); 857 long size = hmc.getActive().getDataSize(); 858 long heapOverhead = hmc.getActive().getHeapSize(); 859 int cellsCount = hmc.getActive().getCellsCount(); 860 int totalLen = 0; 861 for (int i = 0; i < keys.length; i++) { 862 long timestamp = EnvironmentEdgeManager.currentTime(); 863 Threads.sleep(1); // to make sure each kv gets a different ts 864 byte[] row = Bytes.toBytes(keys[i]); 865 byte[] val = Bytes.toBytes(keys[i] + i); 866 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 867 totalLen += Segment.getCellLength(kv); 868 hmc.add(kv, null); 869 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 870 } 871 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 872 hmc.getActive().getHeapSize() - heapOverhead, 0, 873 hmc.getActive().getCellsCount() - cellsCount); 874 return totalLen; 875 } 876 877 // for controlling the val size when adding a new cell 878 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 879 byte[] fam = Bytes.toBytes("testfamily"); 880 byte[] qf = Bytes.toBytes("testqualifier"); 881 long size = hmc.getActive().getDataSize(); 882 long heapOverhead = hmc.getActive().getHeapSize(); 883 int cellsCount = hmc.getActive().getCellsCount(); 884 int totalLen = 0; 885 for (int i = 0; i < keys.length; i++) { 886 long timestamp = EnvironmentEdgeManager.currentTime(); 887 Threads.sleep(1); // to make sure each kv gets a different ts 888 byte[] row = Bytes.toBytes(keys[i]); 889 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 890 totalLen += Segment.getCellLength(kv); 891 hmc.add(kv, null); 892 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 893 } 894 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 895 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 896 return totalLen; 897 } 898 899 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 900 long t = 1234; 901 902 @Override 903 public long currentTime() { 904 return t; 905 } 906 } 907 908 static protected class MyCompactingMemStore extends CompactingMemStore { 909 910 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 911 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 912 throws IOException { 913 super(conf, c, store, regionServices, compactionPolicy); 914 } 915 916 void disableCompaction() { 917 allowCompaction.set(false); 918 } 919 920 void enableCompaction() { 921 allowCompaction.set(true); 922 } 923 924 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 925 throws IllegalArgumentIOException { 926 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 927 } 928 929 } 930}