001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.lang.Thread.UncaughtExceptionHandler; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.ConcurrentModificationException; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.concurrent.BlockingQueue; 034import java.util.concurrent.DelayQueue; 035import java.util.concurrent.Delayed; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.LongAdder; 041import java.util.concurrent.locks.ReentrantReadWriteLock; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.DroppedSnapshotException; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.client.RegionReplicaUtil; 046import org.apache.hadoop.hbase.conf.ConfigurationObserver; 047import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 048import org.apache.hadoop.hbase.trace.TraceUtil; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 052import org.apache.hadoop.hbase.util.Threads; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 060 061/** 062 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because 063 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time 064 * which is invariant. 065 * @see FlushRequester 066 */ 067@InterfaceAudience.Private 068public class MemStoreFlusher implements FlushRequester, ConfigurationObserver { 069 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 070 071 private Configuration conf; 072 // These two data members go together. Any entry in the one must have 073 // a corresponding entry in the other. 074 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 075 protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 076 private AtomicBoolean wakeupPending = new AtomicBoolean(); 077 078 private final long threadWakeFrequency; 079 private final HRegionServer server; 080 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 081 private final Object blockSignal = new Object(); 082 083 private long blockingWaitTime; 084 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 085 086 private FlushHandler[] flushHandlers; 087 088 private final AtomicInteger flusherIdGen = new AtomicInteger(); 089 090 private ThreadFactory flusherThreadFactory; 091 092 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 093 094 /** 095 * Singleton instance inserted into flush queue used for signaling. 096 */ 097 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 098 @Override 099 public long getDelay(TimeUnit unit) { 100 return 0; 101 } 102 103 @Override 104 public int compareTo(Delayed o) { 105 return -1; 106 } 107 108 @Override 109 public boolean equals(Object obj) { 110 return obj == this; 111 } 112 113 @Override 114 public int hashCode() { 115 return 42; 116 } 117 }; 118 119 /** 120 * */ 121 public MemStoreFlusher(final Configuration conf, final HRegionServer server) { 122 super(); 123 this.conf = conf; 124 this.server = server; 125 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 126 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); 127 int handlerCount = 0; 128 if (server != null) { 129 handlerCount = getHandlerCount(conf); 130 LOG.info("globalMemStoreLimit=" 131 + TraditionalBinaryPrefix 132 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 133 + ", globalMemStoreLimitLowMark=" 134 + TraditionalBinaryPrefix.long2String( 135 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 136 + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap())); 137 } 138 this.flushHandlers = new FlushHandler[handlerCount]; 139 } 140 141 public LongAdder getUpdatesBlockedMsHighWater() { 142 return this.updatesBlockedMsHighWater; 143 } 144 145 /** 146 * The memstore across all regions has exceeded the low water mark. Pick one region to flush and 147 * flush it synchronously (this is called from the flush thread) 148 * @return true if successful 149 */ 150 private boolean flushOneForGlobalPressure(FlushType flushType) { 151 SortedMap<Long, HRegion> regionsBySize = null; 152 switch (flushType) { 153 case ABOVE_OFFHEAP_HIGHER_MARK: 154 case ABOVE_OFFHEAP_LOWER_MARK: 155 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 156 break; 157 case ABOVE_ONHEAP_HIGHER_MARK: 158 case ABOVE_ONHEAP_LOWER_MARK: 159 default: 160 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 161 } 162 Set<HRegion> excludedRegions = new HashSet<>(); 163 164 double secondaryMultiplier = 165 ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 166 167 boolean flushedOne = false; 168 while (!flushedOne) { 169 // Find the biggest region that doesn't have too many storefiles (might be null!) 170 HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 171 // Find the biggest region, total, even if it might have too many flushes. 172 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 173 // Find the biggest region that is a secondary region 174 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 175 if (bestAnyRegion == null) { 176 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 177 bestAnyRegion = bestRegionReplica; 178 } 179 if (bestAnyRegion == null) { 180 LOG.error("Above memory mark but there are no flushable regions!"); 181 return false; 182 } 183 184 HRegion regionToFlush; 185 long bestAnyRegionSize; 186 long bestFlushableRegionSize; 187 switch (flushType) { 188 case ABOVE_OFFHEAP_HIGHER_MARK: 189 case ABOVE_OFFHEAP_LOWER_MARK: 190 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 191 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 192 break; 193 194 case ABOVE_ONHEAP_HIGHER_MARK: 195 case ABOVE_ONHEAP_LOWER_MARK: 196 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 197 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 198 break; 199 200 default: 201 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 202 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 203 } 204 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 205 // Even if it's not supposed to be flushed, pick a region if it's more than twice 206 // as big as the best flushable one - otherwise when we're under pressure we make 207 // lots of little flushes and cause lots of compactions, etc, which just makes 208 // life worse! 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Under global heap pressure: " + "Region " 211 + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many " 212 + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 213 + " vs best flushable region's " 214 + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1) 215 + ". Choosing the bigger."); 216 } 217 regionToFlush = bestAnyRegion; 218 } else { 219 if (bestFlushableRegion == null) { 220 regionToFlush = bestAnyRegion; 221 } else { 222 regionToFlush = bestFlushableRegion; 223 } 224 } 225 226 long regionToFlushSize; 227 long bestRegionReplicaSize; 228 switch (flushType) { 229 case ABOVE_OFFHEAP_HIGHER_MARK: 230 case ABOVE_OFFHEAP_LOWER_MARK: 231 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 232 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 233 break; 234 235 case ABOVE_ONHEAP_HIGHER_MARK: 236 case ABOVE_ONHEAP_LOWER_MARK: 237 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 238 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 239 break; 240 241 default: 242 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 243 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 244 } 245 246 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 247 // A concurrency issue (such as splitting region) may happen such that the online region 248 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 249 // getBiggestMemStoreRegion(). This means that we can come out of the loop 250 LOG.debug("Above memory mark but there is no flushable region"); 251 return false; 252 } 253 254 if ( 255 regionToFlush == null || (bestRegionReplica != null 256 && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) 257 && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize)) 258 ) { 259 LOG.info("Refreshing storefiles of region " + bestRegionReplica 260 + " due to global heap pressure. Total memstore off heap size=" 261 + TraditionalBinaryPrefix 262 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 263 + " memstore heap size=" + TraditionalBinaryPrefix 264 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 265 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 266 if (!flushedOne) { 267 LOG.info("Excluding secondary region " + bestRegionReplica 268 + " - trying to find a different region to refresh files."); 269 excludedRegions.add(bestRegionReplica); 270 } 271 } else { 272 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " 273 + "Flush type=" + flushType.toString() + ", Total Memstore Heap size=" 274 + TraditionalBinaryPrefix 275 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) 276 + ", Total Memstore Off-Heap size=" 277 + TraditionalBinaryPrefix 278 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 279 + ", Region memstore size=" 280 + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 281 flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); 282 283 if (!flushedOne) { 284 LOG.info("Excluding unflushable region " + regionToFlush 285 + " - trying to find a different region to flush."); 286 excludedRegions.add(regionToFlush); 287 } 288 } 289 } 290 return true; 291 } 292 293 /** Returns Return memstore offheap size or null if <code>r</code> is null */ 294 private static long getMemStoreOffHeapSize(HRegion r) { 295 return r == null ? 0 : r.getMemStoreOffHeapSize(); 296 } 297 298 /** Returns Return memstore heap size or null if <code>r</code> is null */ 299 private static long getMemStoreHeapSize(HRegion r) { 300 return r == null ? 0 : r.getMemStoreHeapSize(); 301 } 302 303 /** Returns Return memstore data size or null if <code>r</code> is null */ 304 private static long getMemStoreDataSize(HRegion r) { 305 return r == null ? 0 : r.getMemStoreDataSize(); 306 } 307 308 private class FlushHandler extends Thread { 309 310 private final AtomicBoolean running = new AtomicBoolean(true); 311 312 private FlushHandler(String name) { 313 super(name); 314 } 315 316 @Override 317 public void run() { 318 while (!server.isStopped() && running.get()) { 319 FlushQueueEntry fqe = null; 320 try { 321 wakeupPending.set(false); // allow someone to wake us up again 322 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 323 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 324 FlushType type = isAboveLowWaterMark(); 325 if (type != FlushType.NORMAL) { 326 LOG.debug("Flush thread woke up because memory above low water=" 327 + TraditionalBinaryPrefix.long2String( 328 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 329 // For offheap memstore, even if the lower water mark was breached due to heap 330 // overhead 331 // we still select the regions based on the region's memstore data size. 332 // TODO : If we want to decide based on heap over head it can be done without tracking 333 // it per region. 334 if (!flushOneForGlobalPressure(type)) { 335 // Wasn't able to flush any region, but we're above low water mark 336 // This is unlikely to happen, but might happen when closing the 337 // entire server - another thread is flushing regions. We'll just 338 // sleep a little bit to avoid spinning, and then pretend that 339 // we flushed one, so anyone blocked will check again 340 Thread.sleep(1000); 341 wakeUpIfBlocking(); 342 } 343 // Enqueue another one of these tokens so we'll wake up again 344 wakeupFlushThread(); 345 } 346 continue; 347 } 348 FlushRegionEntry fre = (FlushRegionEntry) fqe; 349 if (!flushRegion(fre)) { 350 break; 351 } 352 } catch (InterruptedException ex) { 353 continue; 354 } catch (ConcurrentModificationException ex) { 355 continue; 356 } catch (Exception ex) { 357 LOG.error("Cache flusher failed for entry " + fqe, ex); 358 if (!server.checkFileSystem()) { 359 break; 360 } 361 } 362 } 363 364 if (server.isStopped()) { 365 synchronized (regionsInQueue) { 366 regionsInQueue.clear(); 367 flushQueue.clear(); 368 } 369 370 // Signal anyone waiting, so they see the close flag 371 wakeUpIfBlocking(); 372 } 373 LOG.info(getName() + " exiting"); 374 } 375 376 public void shutdown() { 377 if (!running.compareAndSet(true, false)) { 378 LOG.warn("{} is already signaled to shutdown", getName()); 379 } 380 } 381 } 382 383 private void wakeupFlushThread() { 384 if (wakeupPending.compareAndSet(false, true)) { 385 flushQueue.add(WAKEUPFLUSH_INSTANCE); 386 } 387 } 388 389 private HRegion getBiggestMemStoreRegion(SortedMap<Long, HRegion> regionsBySize, 390 Set<HRegion> excludedRegions, boolean checkStoreFileCount) { 391 synchronized (regionsInQueue) { 392 for (HRegion region : regionsBySize.values()) { 393 if (excludedRegions.contains(region)) { 394 continue; 395 } 396 397 if (region.writestate.flushing || !region.writestate.writesEnabled) { 398 continue; 399 } 400 401 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 402 continue; 403 } 404 return region; 405 } 406 } 407 return null; 408 } 409 410 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 411 Set<HRegion> excludedRegions) { 412 synchronized (regionsInQueue) { 413 for (HRegion region : regionsBySize.values()) { 414 if (excludedRegions.contains(region)) { 415 continue; 416 } 417 418 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 419 continue; 420 } 421 422 return region; 423 } 424 } 425 return null; 426 } 427 428 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 429 try { 430 return region.refreshStoreFiles(); 431 } catch (IOException e) { 432 LOG.warn("Refreshing store files failed with exception", e); 433 } 434 return false; 435 } 436 437 /** 438 * Return the FlushType if global memory usage is above the high watermark 439 */ 440 private FlushType isAboveHighWaterMark() { 441 return server.getRegionServerAccounting().isAboveHighWaterMark(); 442 } 443 444 /** 445 * Return the FlushType if we're above the low watermark 446 */ 447 private FlushType isAboveLowWaterMark() { 448 return server.getRegionServerAccounting().isAboveLowWaterMark(); 449 } 450 451 @Override 452 public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { 453 return this.requestFlush(r, null, tracker); 454 } 455 456 @Override 457 public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 458 synchronized (regionsInQueue) { 459 FlushRegionEntry existFqe = regionsInQueue.get(r); 460 if (existFqe != null) { 461 // if a delayed one exists and not reach the time to execute, just remove it 462 if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { 463 LOG.info("Remove the existing delayed flush entry for {}, " 464 + "because we need to flush it immediately", r); 465 this.regionsInQueue.remove(r); 466 this.flushQueue.remove(existFqe); 467 r.decrementFlushesQueuedCount(); 468 } else { 469 tracker.notExecuted("Flush already requested on " + r); 470 return false; 471 } 472 } 473 474 // This entry has no delay so it will be added at the top of the flush 475 // queue. It'll come out near immediately. 476 FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); 477 this.regionsInQueue.put(r, fqe); 478 this.flushQueue.add(fqe); 479 r.incrementFlushesQueuedCount(); 480 return true; 481 } 482 } 483 484 @Override 485 public boolean requestDelayedFlush(HRegion r, long delay) { 486 synchronized (regionsInQueue) { 487 if (!regionsInQueue.containsKey(r)) { 488 // This entry has some delay 489 FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); 490 fqe.requeue(delay); 491 this.regionsInQueue.put(r, fqe); 492 this.flushQueue.add(fqe); 493 r.incrementFlushesQueuedCount(); 494 return true; 495 } 496 return false; 497 } 498 } 499 500 public int getFlushQueueSize() { 501 return flushQueue.size(); 502 } 503 504 /** 505 * Only interrupt once it's done with a run through the work loop. 506 */ 507 void interruptIfNecessary() { 508 lock.writeLock().lock(); 509 try { 510 for (FlushHandler flushHandler : flushHandlers) { 511 if (flushHandler != null) { 512 flushHandler.interrupt(); 513 } 514 } 515 } finally { 516 lock.writeLock().unlock(); 517 } 518 } 519 520 synchronized void start(UncaughtExceptionHandler eh) { 521 this.flusherThreadFactory = 522 new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build(); 523 lock.readLock().lock(); 524 try { 525 startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length); 526 } finally { 527 lock.readLock().unlock(); 528 } 529 } 530 531 boolean isAlive() { 532 lock.readLock().lock(); 533 try { 534 for (FlushHandler flushHandler : flushHandlers) { 535 if (flushHandler != null && flushHandler.isAlive()) { 536 return true; 537 } 538 } 539 return false; 540 } finally { 541 lock.readLock().unlock(); 542 } 543 } 544 545 void shutdown() { 546 lock.readLock().lock(); 547 try { 548 for (FlushHandler flushHandler : flushHandlers) { 549 if (flushHandler != null) { 550 Threads.shutdown(flushHandler); 551 } 552 } 553 } finally { 554 lock.readLock().unlock(); 555 } 556 } 557 558 /** 559 * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry 560 * later. 561 * @return true if the region was successfully flushed, false otherwise. If false, there will be 562 * accompanying log messages explaining why the region was not flushed. 563 */ 564 private boolean flushRegion(final FlushRegionEntry fqe) { 565 HRegion region = fqe.region; 566 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 567 if (fqe.isMaximumWait(this.blockingWaitTime)) { 568 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) 569 + "ms on a compaction to clean up 'too many store files'; waited " 570 + "long enough... proceeding with flush of " 571 + region.getRegionInfo().getRegionNameAsString()); 572 } else { 573 // If this is first time we've been put off, then emit a log message. 574 if (fqe.getRequeueCount() <= 0) { 575 // Note: We don't impose blockingStoreFiles constraint on meta regions 576 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 577 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 578 this.blockingWaitTime); 579 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 580 if (!compactSplitThread.requestSplit(region)) { 581 try { 582 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 583 } catch (IOException e) { 584 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 585 LOG.error("Cache flush failed for region " 586 + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 587 } 588 } 589 } 590 591 // Put back on the queue. Have it come back out of the queue 592 // after a delay of this.blockingWaitTime / 100 ms. 593 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 594 // Tell a lie, it's not flushed but it's ok 595 return true; 596 } 597 } 598 return flushRegion(region, false, fqe.families, fqe.getTracker()); 599 } 600 601 /** 602 * Flush a region. 603 * @param region Region to flush. 604 * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed 605 * from the flush queue. If false, when we were called from the main flusher 606 * run loop and we got the entry to flush by calling poll on the flush queue 607 * (which removed it). 608 * @param families stores of region to flush. 609 * @return true if the region was successfully flushed, false otherwise. If false, there will be 610 * accompanying log messages explaining why the region was not flushed. 611 */ 612 private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families, 613 FlushLifeCycleTracker tracker) { 614 synchronized (this.regionsInQueue) { 615 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 616 // Use the start time of the FlushRegionEntry if available 617 if (fqe != null && emergencyFlush) { 618 // Need to remove from region from delay queue. When NOT an 619 // emergencyFlush, then item was removed via a flushQueue.poll. 620 flushQueue.remove(fqe); 621 } 622 } 623 624 tracker.beforeExecution(); 625 lock.readLock().lock(); 626 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 627 try { 628 notifyFlushRequest(region, emergencyFlush); 629 FlushResult flushResult = region.flushcache(families, false, tracker); 630 boolean shouldCompact = flushResult.isCompactionNeeded(); 631 // We just want to check the size 632 boolean shouldSplit = region.checkSplit().isPresent(); 633 if (shouldSplit) { 634 compactSplitThread.requestSplit(region); 635 } else if (shouldCompact) { 636 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 637 } 638 } catch (DroppedSnapshotException ex) { 639 // Cache flush can fail in a few places. If it fails in a critical 640 // section, we get a DroppedSnapshotException and a replay of wal 641 // is required. Currently the only way to do this is a restart of 642 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 643 // where hdfs was bad but passed the hdfs check). 644 server.abort("Replay of WAL required. Forcing server shutdown", ex); 645 return false; 646 } catch (IOException ex) { 647 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 648 LOG.error("Cache flush failed" + (region != null 649 ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 650 : ""), ex); 651 if (!server.checkFileSystem()) { 652 return false; 653 } 654 } finally { 655 lock.readLock().unlock(); 656 wakeUpIfBlocking(); 657 tracker.afterExecution(); 658 } 659 return true; 660 } 661 662 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 663 FlushType type = null; 664 if (emergencyFlush) { 665 type = isAboveHighWaterMark(); 666 } 667 if (type == null) { 668 type = isAboveLowWaterMark(); 669 } 670 for (FlushRequestListener listener : flushRequestListeners) { 671 listener.flushRequested(type, region); 672 } 673 } 674 675 private void wakeUpIfBlocking() { 676 synchronized (blockSignal) { 677 blockSignal.notifyAll(); 678 } 679 } 680 681 private boolean isTooManyStoreFiles(Region region) { 682 683 // When compaction is disabled, the region is flushable 684 if (!region.getTableDescriptor().isCompactionEnabled()) { 685 return false; 686 } 687 688 for (Store store : region.getStores()) { 689 if (store.hasTooManyStoreFiles()) { 690 return true; 691 } 692 } 693 return false; 694 } 695 696 private int getStoreFileCount(Region region) { 697 int count = 0; 698 for (Store store : region.getStores()) { 699 count += store.getStorefilesCount(); 700 } 701 return count; 702 } 703 704 /** 705 * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush 706 * regions with the biggest memstores until we're down to the lower limit. This method blocks 707 * callers until we're down to a safe amount of memstore consumption. 708 */ 709 public void reclaimMemStoreMemory() { 710 Span span = 711 TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan(); 712 try (Scope scope = span.makeCurrent()) { 713 FlushType flushType = isAboveHighWaterMark(); 714 if (flushType != FlushType.NORMAL) { 715 span.addEvent("Force Flush. We're above high water mark."); 716 long start = EnvironmentEdgeManager.currentTime(); 717 long nextLogTimeMs = start; 718 synchronized (this.blockSignal) { 719 boolean blocked = false; 720 long startTime = 0; 721 boolean interrupted = false; 722 try { 723 flushType = isAboveHighWaterMark(); 724 while (flushType != FlushType.NORMAL && !server.isStopped()) { 725 if (!blocked) { 726 startTime = EnvironmentEdgeManager.currentTime(); 727 if (!server.getRegionServerAccounting().isOffheap()) { 728 logMsg("global memstore heapsize", 729 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 730 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 731 } else { 732 switch (flushType) { 733 case ABOVE_OFFHEAP_HIGHER_MARK: 734 logMsg("the global offheap memstore datasize", 735 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 736 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 737 break; 738 case ABOVE_ONHEAP_HIGHER_MARK: 739 logMsg("global memstore heapsize", 740 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 741 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 742 break; 743 default: 744 break; 745 } 746 } 747 } 748 blocked = true; 749 wakeupFlushThread(); 750 try { 751 // we should be able to wait forever, but we've seen a bug where 752 // we miss a notify, so put a 5 second bound on it at least. 753 blockSignal.wait(5 * 1000); 754 } catch (InterruptedException ie) { 755 LOG.warn("Interrupted while waiting"); 756 interrupted = true; 757 } 758 long nowMs = EnvironmentEdgeManager.currentTime(); 759 if (nowMs >= nextLogTimeMs) { 760 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 761 nextLogTimeMs = nowMs + 1000; 762 } 763 flushType = isAboveHighWaterMark(); 764 } 765 } finally { 766 if (interrupted) { 767 Thread.currentThread().interrupt(); 768 } 769 } 770 771 if (blocked) { 772 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 773 if (totalTime > 0) { 774 this.updatesBlockedMsHighWater.add(totalTime); 775 } 776 LOG.info("Unblocking updates for server " + server.toString()); 777 } 778 } 779 } else { 780 flushType = isAboveLowWaterMark(); 781 if (flushType != FlushType.NORMAL) { 782 wakeupFlushThread(); 783 } 784 span.end(); 785 } 786 } 787 } 788 789 private void logMsg(String type, long val, long max) { 790 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 791 TraditionalBinaryPrefix.long2String(val, "", 1), 792 TraditionalBinaryPrefix.long2String(max, "", 1)); 793 } 794 795 @Override 796 public String toString() { 797 return "flush_queue=" + flushQueue.size(); 798 } 799 800 public String dumpQueue() { 801 StringBuilder queueList = new StringBuilder(); 802 queueList.append("Flush Queue Queue dump:\n"); 803 queueList.append(" Flush Queue:\n"); 804 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 805 806 while (it.hasNext()) { 807 queueList.append(" " + it.next().toString()); 808 queueList.append("\n"); 809 } 810 811 return queueList.toString(); 812 } 813 814 /** 815 * Register a MemstoreFlushListener 816 */ 817 @Override 818 public void registerFlushRequestListener(final FlushRequestListener listener) { 819 this.flushRequestListeners.add(listener); 820 } 821 822 /** 823 * Unregister the listener from MemstoreFlushListeners 824 * @return true when passed listener is unregistered successfully. 825 */ 826 @Override 827 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 828 return this.flushRequestListeners.remove(listener); 829 } 830 831 /** 832 * Sets the global memstore limit to a new size. 833 */ 834 @Override 835 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 836 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 837 reclaimMemStoreMemory(); 838 } 839 840 interface FlushQueueEntry extends Delayed { 841 } 842 843 /** 844 * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this 845 * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a 846 * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in 847 * milliseconds before readding to delay queue if you want it to stay there a while. 848 */ 849 static class FlushRegionEntry implements FlushQueueEntry { 850 private final HRegion region; 851 852 private final long createTime; 853 private long whenToExpire; 854 private int requeueCount = 0; 855 856 private final List<byte[]> families; 857 858 private final FlushLifeCycleTracker tracker; 859 860 FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 861 this.region = r; 862 this.createTime = EnvironmentEdgeManager.currentTime(); 863 this.whenToExpire = this.createTime; 864 this.families = families; 865 this.tracker = tracker; 866 } 867 868 /** Returns True if we have been delayed > <code>maximumWait</code> milliseconds. */ 869 public boolean isMaximumWait(final long maximumWait) { 870 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 871 } 872 873 /** Returns True if the entry is a delay flush task */ 874 protected boolean isDelay() { 875 return this.whenToExpire > this.createTime; 876 } 877 878 /** 879 * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've 880 * been requeued. 881 */ 882 public int getRequeueCount() { 883 return this.requeueCount; 884 } 885 886 public FlushLifeCycleTracker getTracker() { 887 return tracker; 888 } 889 890 /** 891 * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This 892 * method adds EnvironmentEdgeManager.currentTime() to whatever you pass. 893 * @return This. 894 */ 895 public FlushRegionEntry requeue(final long when) { 896 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 897 this.requeueCount++; 898 return this; 899 } 900 901 @Override 902 public long getDelay(TimeUnit unit) { 903 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 904 TimeUnit.MILLISECONDS); 905 } 906 907 @Override 908 public int compareTo(Delayed other) { 909 // Delay is compared first. If there is a tie, compare region's hash code 910 int ret = 911 Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)) 912 .intValue(); 913 if (ret != 0) { 914 return ret; 915 } 916 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 917 return hashCode() - otherEntry.hashCode(); 918 } 919 920 @Override 921 public String toString() { 922 return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]"; 923 } 924 925 @Override 926 public int hashCode() { 927 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 928 return hash ^ region.hashCode(); 929 } 930 931 @Override 932 public boolean equals(Object obj) { 933 if (this == obj) { 934 return true; 935 } 936 if (obj == null || getClass() != obj.getClass()) { 937 return false; 938 } 939 FlushRegionEntry other = (FlushRegionEntry) obj; 940 if ( 941 !Bytes.equals(this.region.getRegionInfo().getRegionName(), 942 other.region.getRegionInfo().getRegionName()) 943 ) { 944 return false; 945 } 946 return compareTo(other) == 0; 947 } 948 } 949 950 private int getHandlerCount(Configuration conf) { 951 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 952 if (handlerCount < 1) { 953 LOG.warn( 954 "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", 955 handlerCount); 956 handlerCount = 1; 957 } 958 return handlerCount; 959 } 960 961 @Override 962 public void onConfigurationChange(Configuration newConf) { 963 int newHandlerCount = getHandlerCount(newConf); 964 if (newHandlerCount != flushHandlers.length) { 965 LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length, 966 newHandlerCount); 967 lock.writeLock().lock(); 968 try { 969 FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount); 970 if (newHandlerCount > flushHandlers.length) { 971 startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length); 972 } else { 973 stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length); 974 } 975 flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length); 976 this.flushHandlers = newFlushHandlers; 977 } finally { 978 lock.writeLock().unlock(); 979 } 980 } 981 } 982 983 private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { 984 if (flusherThreadFactory != null) { 985 for (int i = start; i < end; i++) { 986 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement()); 987 flusherThreadFactory.newThread(flushHandlers[i]); 988 flushHandlers[i].start(); 989 } 990 } 991 } 992 993 private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { 994 for (int i = start; i < end; i++) { 995 flushHandlers[i].shutdown(); 996 if (LOG.isDebugEnabled()) { 997 LOG.debug("send shutdown signal to {}", flushHandlers[i].getName()); 998 } 999 } 1000 } 1001 1002 public int getFlusherCount() { 1003 return flusherIdGen.get(); 1004 } 1005}