001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.MemoryCompactionPolicy; 030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.ClassSize; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.wal.WAL; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A memstore implementation which supports in-memory compaction. A compaction pipeline is added 042 * between the active set and the snapshot data structures; it consists of a list of segments that 043 * are subject to compaction. Like the snapshot, all pipeline segments are read-only; updates only 044 * affect the active set. To ensure this property we take advantage of the existing blocking 045 * mechanism -- the active set is pushed to the pipeline while holding the region's updatesLock in 046 * exclusive mode. Periodically, a compaction is applied in the background to all pipeline segments 047 * resulting in a single read-only component. The ``old'' segments are discarded when no scanner is 048 * reading them. 049 */ 050@InterfaceAudience.Private 051public class CompactingMemStore extends AbstractMemStore { 052 053 // The external setting of the compacting MemStore behaviour 054 public static final String COMPACTING_MEMSTORE_TYPE_KEY = 055 "hbase.hregion.compacting.memstore.type"; 056 public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 057 String.valueOf(MemoryCompactionPolicy.NONE); 058 // Default fraction of in-memory-flush size w.r.t. flush-to-disk size 059 public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = 060 "hbase.memstore.inmemoryflush.threshold.factor"; 061 private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1; 062 // In-Memory compaction pool size 063 public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = 064 "hbase.regionserver.inmemory.compaction.pool.size"; 065 public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; 066 067 private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); 068 private HStore store; 069 private CompactionPipeline pipeline; 070 protected MemStoreCompactor compactor; 071 072 private long inmemoryFlushSize; // the threshold on active size for in-memory flush 073 private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false); 074 075 // inWalReplay is true while we are synchronously replaying the edits from WAL 076 private boolean inWalReplay = false; 077 078 protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); 079 private boolean compositeSnapshot = true; 080 081 /** 082 * Types of indexes (part of immutable segments) to be used after flattening, compaction, or merge 083 * are applied. 084 */ 085 public enum IndexType { 086 CSLM_MAP, // ConcurrentSkipLisMap 087 ARRAY_MAP, // CellArrayMap 088 CHUNK_MAP // CellChunkMap 089 } 090 091 private IndexType indexType = IndexType.ARRAY_MAP; // default implementation 092 093 public static final long DEEP_OVERHEAD = 094 ClassSize.align(AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, 095 // CompactionPipeline, 096 // MemStoreCompactor, inMemoryCompactionInProgress, 097 // allowCompaction, indexType 098 + Bytes.SIZEOF_LONG // inmemoryFlushSize 099 + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay 100 + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction 101 + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); 102 103 public CompactingMemStore(Configuration conf, CellComparator c, HStore store, 104 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 105 throws IOException { 106 super(conf, c, regionServices); 107 this.store = store; 108 this.regionServices = regionServices; 109 this.pipeline = new CompactionPipeline(getRegionServices()); 110 this.compactor = createMemStoreCompactor(compactionPolicy); 111 if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { 112 // if user requested to work with MSLABs (whether on- or off-heap), then the 113 // immutable segments are going to use CellChunkMap as their index 114 indexType = IndexType.CHUNK_MAP; 115 } else { 116 indexType = IndexType.ARRAY_MAP; 117 } 118 // initialization of the flush size should happen after initialization of the index type 119 // so do not transfer the following method 120 initInmemoryFlushSize(conf); 121 LOG.info( 122 "Store={}, in-memory flush size threshold={}, immutable segments index type={}, " 123 + "compactor={}", 124 this.store.getColumnFamilyName(), StringUtils.byteDesc(this.inmemoryFlushSize), 125 this.indexType, (this.compactor == null ? "NULL" : this.compactor.toString())); 126 } 127 128 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 129 throws IllegalArgumentIOException { 130 return new MemStoreCompactor(this, compactionPolicy); 131 } 132 133 private void initInmemoryFlushSize(Configuration conf) { 134 double factor = 0; 135 long memstoreFlushSize = getRegionServices().getMemStoreFlushSize(); 136 int numStores = getRegionServices().getNumStores(); 137 if (numStores <= 1) { 138 // Family number might also be zero in some of our unit test case 139 numStores = 1; 140 } 141 factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0); 142 if (factor != 0.0) { 143 // multiply by a factor (the same factor for all index types) 144 inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores; 145 } else { 146 inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER 147 * conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 148 inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER; 149 } 150 } 151 152 /** 153 * @return Total memory occupied by this MemStore. This won't include any size occupied by the 154 * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and the 155 * memstore may be changed while computing its size. It is the responsibility of the 156 * caller to make sure this doesn't happen. 157 */ 158 @Override 159 public MemStoreSize size() { 160 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 161 memstoreSizing.incMemStoreSize(getActive().getMemStoreSize()); 162 for (Segment item : pipeline.getSegments()) { 163 memstoreSizing.incMemStoreSize(item.getMemStoreSize()); 164 } 165 return memstoreSizing.getMemStoreSize(); 166 } 167 168 /** 169 * This method is called before the flush is executed. 170 * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush is 171 * executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. 172 */ 173 @Override 174 public long preFlushSeqIDEstimation() { 175 if (compositeSnapshot) { 176 return HConstants.NO_SEQNUM; 177 } 178 Segment segment = getLastSegment(); 179 if (segment == null) { 180 return HConstants.NO_SEQNUM; 181 } 182 return segment.getMinSequenceId(); 183 } 184 185 @Override 186 public boolean isSloppy() { 187 return true; 188 } 189 190 /** 191 * Push the current active memstore segment into the pipeline and create a snapshot of the tail of 192 * current compaction pipeline Snapshot must be cleared by call to {@link #clearSnapshot}. 193 * {@link #clearSnapshot(long)}. 194 * @return {@link MemStoreSnapshot} 195 */ 196 @Override 197 public MemStoreSnapshot snapshot() { 198 // If snapshot currently has entries, then flusher failed or didn't call 199 // cleanup. Log a warning. 200 if (!this.snapshot.isEmpty()) { 201 LOG.warn("Snapshot called again without clearing previous. " 202 + "Doing nothing. Another ongoing flush or did we fail last attempt?"); 203 } else { 204 LOG.debug("FLUSHING TO DISK {}, store={}", 205 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); 206 stopCompaction(); 207 // region level lock ensures pushing active to pipeline is done in isolation 208 // no concurrent update operations trying to flush the active segment 209 pushActiveToPipeline(getActive(), true); 210 resetTimeOfOldestEdit(); 211 snapshotId = EnvironmentEdgeManager.currentTime(); 212 // in both cases whatever is pushed to snapshot is cleared from the pipeline 213 if (compositeSnapshot) { 214 pushPipelineToSnapshot(); 215 } else { 216 pushTailToSnapshot(); 217 } 218 compactor.resetStats(); 219 } 220 return new MemStoreSnapshot(snapshotId, this.snapshot); 221 } 222 223 @Override 224 public MemStoreSize getFlushableSize() { 225 MemStoreSize mss = getSnapshotSize(); 226 if (mss.getDataSize() == 0) { 227 // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed 228 if (compositeSnapshot) { 229 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); 230 MutableSegment currActive = getActive(); 231 if (!currActive.isEmpty()) { 232 memStoreSizing.incMemStoreSize(currActive.getMemStoreSize()); 233 } 234 mss = memStoreSizing.getMemStoreSize(); 235 } else { 236 mss = pipeline.getTailSize(); 237 } 238 } 239 return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize(); 240 } 241 242 public void setInMemoryCompactionCompleted() { 243 inMemoryCompactionInProgress.set(false); 244 } 245 246 protected boolean setInMemoryCompactionFlag() { 247 return inMemoryCompactionInProgress.compareAndSet(false, true); 248 } 249 250 @Override 251 protected long keySize() { 252 // Need to consider dataSize/keySize of all segments in pipeline and active 253 long keySize = getActive().getDataSize(); 254 for (Segment segment : this.pipeline.getSegments()) { 255 keySize += segment.getDataSize(); 256 } 257 return keySize; 258 } 259 260 @Override 261 protected long heapSize() { 262 // Need to consider heapOverhead of all segments in pipeline and active 263 long h = getActive().getHeapSize(); 264 for (Segment segment : this.pipeline.getSegments()) { 265 h += segment.getHeapSize(); 266 } 267 return h; 268 } 269 270 @Override 271 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { 272 long minSequenceId = pipeline.getMinSequenceId(); 273 if (minSequenceId != Long.MAX_VALUE) { 274 byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); 275 byte[] familyName = getFamilyNameInBytes(); 276 WAL WAL = getRegionServices().getWAL(); 277 if (WAL != null) { 278 WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); 279 } 280 } 281 } 282 283 /** 284 * This message intends to inform the MemStore that next coming updates are going to be part of 285 * the replaying edits from WAL 286 */ 287 @Override 288 public void startReplayingFromWAL() { 289 inWalReplay = true; 290 } 291 292 /** 293 * This message intends to inform the MemStore that the replaying edits from WAL are done 294 */ 295 @Override 296 public void stopReplayingFromWAL() { 297 inWalReplay = false; 298 } 299 300 /** 301 * Issue any synchronization and test needed before applying the update For compacting memstore 302 * this means checking the update can increase the size without overflow 303 * @param currentActive the segment to be updated 304 * @param cell the cell to be added 305 * @param memstoreSizing object to accumulate region size changes 306 * @return true iff can proceed with applying the update 307 */ 308 @Override 309 protected boolean preUpdate(MutableSegment currentActive, Cell cell, 310 MemStoreSizing memstoreSizing) { 311 if (currentActive.sharedLock()) { 312 if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) { 313 return true; 314 } 315 currentActive.sharedUnlock(); 316 } 317 return false; 318 } 319 320 @Override 321 protected void postUpdate(MutableSegment currentActive) { 322 currentActive.sharedUnlock(); 323 } 324 325 @Override 326 protected boolean sizeAddedPreOperation() { 327 return true; 328 } 329 330 // the getSegments() method is used for tests only 331 @Override 332 protected List<Segment> getSegments() { 333 List<? extends Segment> pipelineList = pipeline.getSegments(); 334 List<Segment> list = new ArrayList<>(pipelineList.size() + 2); 335 list.add(getActive()); 336 list.addAll(pipelineList); 337 list.addAll(snapshot.getAllSegments()); 338 339 return list; 340 } 341 342 // the following three methods allow to manipulate the settings of composite snapshot 343 public void setCompositeSnapshot(boolean useCompositeSnapshot) { 344 this.compositeSnapshot = useCompositeSnapshot; 345 } 346 347 public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, 348 boolean merge) { 349 // last true stands for updating the region size 350 return pipeline.swap(versionedList, result, !merge, true); 351 } 352 353 /** 354 * @param requesterVersion The caller must hold the VersionedList of the pipeline with version 355 * taken earlier. This version must be passed as a parameter here. The 356 * flattening happens only if versions match. 357 */ 358 public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { 359 pipeline.flattenOneSegment(requesterVersion, indexType, action); 360 } 361 362 // setter is used only for testability 363 void setIndexType(IndexType type) { 364 indexType = type; 365 // Because this functionality is for testing only and tests are setting in-memory flush size 366 // according to their need, there is no setting of in-memory flush size, here. 367 // If it is needed, please change in-memory flush size explicitly 368 } 369 370 public IndexType getIndexType() { 371 return indexType; 372 } 373 374 public boolean hasImmutableSegments() { 375 return !pipeline.isEmpty(); 376 } 377 378 public VersionedSegmentsList getImmutableSegments() { 379 return pipeline.getVersionedList(); 380 } 381 382 public long getSmallestReadPoint() { 383 return store.getSmallestReadPoint(); 384 } 385 386 public HStore getStore() { 387 return store; 388 } 389 390 public String getFamilyName() { 391 return Bytes.toString(getFamilyNameInBytes()); 392 } 393 394 /** 395 * This method is protected under {@link HStore#lock} read lock. 396 */ 397 @Override 398 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 399 MutableSegment activeTmp = getActive(); 400 List<? extends Segment> pipelineList = pipeline.getSegments(); 401 List<? extends Segment> snapshotList = snapshot.getAllSegments(); 402 long numberOfSegments = 1L + pipelineList.size() + snapshotList.size(); 403 // The list of elements in pipeline + the active element + the snapshot segment 404 List<KeyValueScanner> list = createList((int) numberOfSegments); 405 addToScanners(activeTmp, readPt, list); 406 addToScanners(pipelineList, readPt, list); 407 addToScanners(snapshotList, readPt, list); 408 return list; 409 } 410 411 protected List<KeyValueScanner> createList(int capacity) { 412 return new ArrayList<>(capacity); 413 } 414 415 /** 416 * Check whether anything need to be done based on the current active set size. The method is 417 * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to 418 * the read-only memory if it's size is above threshold 419 * @param currActive intended segment to update 420 * @param cellToAdd cell to be added to the segment 421 * @param memstoreSizing object to accumulate changed size 422 * @return true if the cell can be added to the currActive 423 */ 424 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 425 MemStoreSizing memstoreSizing) { 426 long cellSize = MutableSegment.getCellLength(cellToAdd); 427 boolean successAdd = false; 428 while (true) { 429 long segmentDataSize = currActive.getDataSize(); 430 if (!inWalReplay && segmentDataSize > inmemoryFlushSize) { 431 // when replaying edits from WAL there is no need in in-memory flush regardless the size 432 // otherwise size below flush threshold try to update atomically 433 break; 434 } 435 if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { 436 if (memstoreSizing != null) { 437 memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); 438 } 439 successAdd = true; 440 break; 441 } 442 } 443 444 if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) { 445 // size above flush threshold so we flush in memory 446 this.tryFlushInMemoryAndCompactingAsync(currActive); 447 } 448 return successAdd; 449 } 450 451 /** 452 * Try to flush the currActive in memory and submit the background 453 * {@link InMemoryCompactionRunnable} to 454 * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual 455 * flushing in memory. 456 * @param currActive current Active Segment to be flush in memory. 457 */ 458 private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) { 459 if (currActive.setInMemoryFlushed()) { 460 flushInMemory(currActive); 461 if (setInMemoryCompactionFlag()) { 462 // The thread is dispatched to do in-memory compaction in the background 463 InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); 464 if (LOG.isTraceEnabled()) { 465 LOG.trace( 466 "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); 467 } 468 getPool().execute(runnable); 469 } 470 } 471 } 472 473 // externally visible only for tests 474 // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, 475 // otherwise there is a deadlock 476 void flushInMemory() { 477 MutableSegment currActive = getActive(); 478 if (currActive.setInMemoryFlushed()) { 479 flushInMemory(currActive); 480 } 481 inMemoryCompaction(); 482 } 483 484 protected void flushInMemory(MutableSegment currActive) { 485 LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); 486 // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize 487 // and then actually add cell to currActive.cellSet, it is possible that 488 // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still 489 // empty if pending writes which not yet add cells to currActive.cellSet. 490 // so here we should not check currActive.isEmpty or not. 491 pushActiveToPipeline(currActive, false); 492 } 493 494 void inMemoryCompaction() { 495 // setting the inMemoryCompactionInProgress flag again for the case this method is invoked 496 // directly (only in tests) in the common path setting from true to true is idempotent 497 inMemoryCompactionInProgress.set(true); 498 // Used by tests 499 if (!allowCompaction.get()) { 500 return; 501 } 502 try { 503 // Speculative compaction execution, may be interrupted if flush is forced while 504 // compaction is in progress 505 if (!compactor.start()) { 506 setInMemoryCompactionCompleted(); 507 } 508 } catch (IOException e) { 509 LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}", 510 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); 511 } 512 } 513 514 private Segment getLastSegment() { 515 Segment localActive = getActive(); 516 Segment tail = pipeline.getTail(); 517 return tail == null ? localActive : tail; 518 } 519 520 private byte[] getFamilyNameInBytes() { 521 return store.getColumnFamilyDescriptor().getName(); 522 } 523 524 private ThreadPoolExecutor getPool() { 525 return getRegionServices().getInMemoryCompactionPool(); 526 } 527 528 /** 529 * The request to cancel the compaction asynchronous task (caused by in-memory flush) The 530 * compaction may still happen if the request was sent too late Non-blocking request 531 */ 532 private void stopCompaction() { 533 if (inMemoryCompactionInProgress.get()) { 534 compactor.stop(); 535 } 536 } 537 538 /** 539 * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to 540 * concurrent writes and because we first add cell size to currActive.getDataSize and then 541 * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not 542 * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add 543 * cells to currActive.cellSet,so for 544 * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if 545 * {@link CompactingMemStore#snapshot} called this method,because there is no pending 546 * write,checkEmpty parameter could be true. 547 */ 548 protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) { 549 if (!checkEmpty || !currActive.isEmpty()) { 550 pipeline.pushHead(currActive); 551 resetActive(); 552 } 553 } 554 555 private void pushTailToSnapshot() { 556 VersionedSegmentsList segments = pipeline.getVersionedTail(); 557 pushToSnapshot(segments.getStoreSegments()); 558 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 559 pipeline.swap(segments, null, false, false); 560 } 561 562 private void pushPipelineToSnapshot() { 563 int iterationsCnt = 0; 564 boolean done = false; 565 while (!done) { 566 iterationsCnt++; 567 VersionedSegmentsList segments = getImmutableSegments(); 568 pushToSnapshot(segments.getStoreSegments()); 569 // swap can return false in case the pipeline was updated by ongoing compaction 570 // and the version increase, the chance of it happenning is very low 571 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 572 done = swapPipelineWithNull(segments); 573 if (iterationsCnt > 2) { 574 // practically it is impossible that this loop iterates more than two times 575 // (because the compaction is stopped and none restarts it while in snapshot request), 576 // however stopping here for the case of the infinite loop causing by any error 577 LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," 578 + " while flushing to disk."); 579 this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator()); 580 break; 581 } 582 } 583 } 584 585 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 586 return pipeline.swap(segments, null, false, false); 587 } 588 589 private void pushToSnapshot(List<ImmutableSegment> segments) { 590 if (segments.isEmpty()) return; 591 if (segments.size() == 1 && !segments.get(0).isEmpty()) { 592 this.snapshot = segments.get(0); 593 return; 594 } else { // create composite snapshot 595 this.snapshot = 596 SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); 597 } 598 } 599 600 private RegionServicesForStores getRegionServices() { 601 return regionServices; 602 } 603 604 /** 605 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 606 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 607 * releases updatesLock and compacts the pipeline. 608 */ 609 private class InMemoryCompactionRunnable implements Runnable { 610 @Override 611 public void run() { 612 inMemoryCompaction(); 613 } 614 } 615 616 boolean isMemStoreFlushingInMemory() { 617 return inMemoryCompactionInProgress.get(); 618 } 619 620 /** 621 * @param cell Find the row that comes after this one. If null, we return the first. 622 * @return Next row or null if none found. 623 */ 624 Cell getNextRow(final Cell cell) { 625 Cell lowest = null; 626 List<Segment> segments = getSegments(); 627 for (Segment segment : segments) { 628 if (lowest == null) { 629 lowest = getNextRow(cell, segment.getCellSet()); 630 } else { 631 lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); 632 } 633 } 634 return lowest; 635 } 636 637 long getInmemoryFlushSize() { 638 return inmemoryFlushSize; 639 } 640 641 // debug method 642 public void debug() { 643 String msg = "active size=" + getActive().getDataSize(); 644 msg += " allow compaction is " + (allowCompaction.get() ? "true" : "false"); 645 msg += 646 " inMemoryCompactionInProgress is " + (inMemoryCompactionInProgress.get() ? "true" : "false"); 647 LOG.debug(msg); 648 } 649 650}