001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.lang.Thread.UncaughtExceptionHandler;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.ConcurrentModificationException;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.concurrent.BlockingQueue;
034import java.util.concurrent.DelayQueue;
035import java.util.concurrent.Delayed;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.LongAdder;
041import java.util.concurrent.locks.ReentrantReadWriteLock;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.DroppedSnapshotException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.client.RegionReplicaUtil;
046import org.apache.hadoop.hbase.conf.ConfigurationObserver;
047import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
048import org.apache.hadoop.hbase.trace.TraceUtil;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
052import org.apache.hadoop.hbase.util.Threads;
053import org.apache.hadoop.ipc.RemoteException;
054import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
060
061/**
062 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because
063 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time
064 * which is invariant.
065 * @see FlushRequester
066 */
067@InterfaceAudience.Private
068public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
069  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
070
071  private Configuration conf;
072  // These two data members go together. Any entry in the one must have
073  // a corresponding entry in the other.
074  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
075  protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
076  private AtomicBoolean wakeupPending = new AtomicBoolean();
077
078  private final long threadWakeFrequency;
079  private final HRegionServer server;
080  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
081  private final Object blockSignal = new Object();
082
083  private long blockingWaitTime;
084  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
085
086  private FlushHandler[] flushHandlers;
087
088  private final AtomicInteger flusherIdGen = new AtomicInteger();
089
090  private ThreadFactory flusherThreadFactory;
091
092  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
093
094  /**
095   * Singleton instance inserted into flush queue used for signaling.
096   */
097  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
098    @Override
099    public long getDelay(TimeUnit unit) {
100      return 0;
101    }
102
103    @Override
104    public int compareTo(Delayed o) {
105      return -1;
106    }
107
108    @Override
109    public boolean equals(Object obj) {
110      return obj == this;
111    }
112
113    @Override
114    public int hashCode() {
115      return 42;
116    }
117  };
118
119  /**
120   *   */
121  public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
122    super();
123    this.conf = conf;
124    this.server = server;
125    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
126    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
127    int handlerCount = 0;
128    if (server != null) {
129      handlerCount = getHandlerCount(conf);
130      LOG.info("globalMemStoreLimit="
131        + TraditionalBinaryPrefix
132          .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
133        + ", globalMemStoreLimitLowMark="
134        + TraditionalBinaryPrefix.long2String(
135          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
136        + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap()));
137    }
138    this.flushHandlers = new FlushHandler[handlerCount];
139  }
140
141  public LongAdder getUpdatesBlockedMsHighWater() {
142    return this.updatesBlockedMsHighWater;
143  }
144
145  /**
146   * The memstore across all regions has exceeded the low water mark. Pick one region to flush and
147   * flush it synchronously (this is called from the flush thread)
148   * @return true if successful
149   */
150  private boolean flushOneForGlobalPressure(FlushType flushType) {
151    SortedMap<Long, HRegion> regionsBySize = null;
152    switch (flushType) {
153      case ABOVE_OFFHEAP_HIGHER_MARK:
154      case ABOVE_OFFHEAP_LOWER_MARK:
155        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
156        break;
157      case ABOVE_ONHEAP_HIGHER_MARK:
158      case ABOVE_ONHEAP_LOWER_MARK:
159      default:
160        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
161    }
162    Set<HRegion> excludedRegions = new HashSet<>();
163
164    double secondaryMultiplier =
165      ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
166
167    boolean flushedOne = false;
168    while (!flushedOne) {
169      // Find the biggest region that doesn't have too many storefiles (might be null!)
170      HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
171      // Find the biggest region, total, even if it might have too many flushes.
172      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
173      // Find the biggest region that is a secondary region
174      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
175      if (bestAnyRegion == null) {
176        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
177        bestAnyRegion = bestRegionReplica;
178      }
179      if (bestAnyRegion == null) {
180        LOG.error("Above memory mark but there are no flushable regions!");
181        return false;
182      }
183
184      HRegion regionToFlush;
185      long bestAnyRegionSize;
186      long bestFlushableRegionSize;
187      switch (flushType) {
188        case ABOVE_OFFHEAP_HIGHER_MARK:
189        case ABOVE_OFFHEAP_LOWER_MARK:
190          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
191          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
192          break;
193
194        case ABOVE_ONHEAP_HIGHER_MARK:
195        case ABOVE_ONHEAP_LOWER_MARK:
196          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
197          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
198          break;
199
200        default:
201          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
202          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
203      }
204      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
205        // Even if it's not supposed to be flushed, pick a region if it's more than twice
206        // as big as the best flushable one - otherwise when we're under pressure we make
207        // lots of little flushes and cause lots of compactions, etc, which just makes
208        // life worse!
209        if (LOG.isDebugEnabled()) {
210          LOG.debug("Under global heap pressure: " + "Region "
211            + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many "
212            + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
213            + " vs best flushable region's "
214            + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1)
215            + ". Choosing the bigger.");
216        }
217        regionToFlush = bestAnyRegion;
218      } else {
219        if (bestFlushableRegion == null) {
220          regionToFlush = bestAnyRegion;
221        } else {
222          regionToFlush = bestFlushableRegion;
223        }
224      }
225
226      long regionToFlushSize;
227      long bestRegionReplicaSize;
228      switch (flushType) {
229        case ABOVE_OFFHEAP_HIGHER_MARK:
230        case ABOVE_OFFHEAP_LOWER_MARK:
231          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
232          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
233          break;
234
235        case ABOVE_ONHEAP_HIGHER_MARK:
236        case ABOVE_ONHEAP_LOWER_MARK:
237          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
238          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
239          break;
240
241        default:
242          regionToFlushSize = regionToFlush.getMemStoreDataSize();
243          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
244      }
245
246      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
247        // A concurrency issue (such as splitting region) may happen such that the online region
248        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
249        // getBiggestMemStoreRegion(). This means that we can come out of the loop
250        LOG.debug("Above memory mark but there is no flushable region");
251        return false;
252      }
253
254      if (
255        regionToFlush == null || (bestRegionReplica != null
256          && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf)
257          && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))
258      ) {
259        LOG.info("Refreshing storefiles of region " + bestRegionReplica
260          + " due to global heap pressure. Total memstore off heap size="
261          + TraditionalBinaryPrefix
262            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
263          + " memstore heap size=" + TraditionalBinaryPrefix
264            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
265        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
266        if (!flushedOne) {
267          LOG.info("Excluding secondary region " + bestRegionReplica
268            + " - trying to find a different region to refresh files.");
269          excludedRegions.add(bestRegionReplica);
270        }
271      } else {
272        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
273          + "Flush type=" + flushType.toString() + ", Total Memstore Heap size="
274          + TraditionalBinaryPrefix
275            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)
276          + ", Total Memstore Off-Heap size="
277          + TraditionalBinaryPrefix
278            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
279          + ", Region memstore size="
280          + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
281        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
282
283        if (!flushedOne) {
284          LOG.info("Excluding unflushable region " + regionToFlush
285            + " - trying to find a different region to flush.");
286          excludedRegions.add(regionToFlush);
287        }
288      }
289    }
290    return true;
291  }
292
293  /** Returns Return memstore offheap size or null if <code>r</code> is null */
294  private static long getMemStoreOffHeapSize(HRegion r) {
295    return r == null ? 0 : r.getMemStoreOffHeapSize();
296  }
297
298  /** Returns Return memstore heap size or null if <code>r</code> is null */
299  private static long getMemStoreHeapSize(HRegion r) {
300    return r == null ? 0 : r.getMemStoreHeapSize();
301  }
302
303  /** Returns Return memstore data size or null if <code>r</code> is null */
304  private static long getMemStoreDataSize(HRegion r) {
305    return r == null ? 0 : r.getMemStoreDataSize();
306  }
307
308  private class FlushHandler extends Thread {
309
310    private final AtomicBoolean running = new AtomicBoolean(true);
311
312    private FlushHandler(String name) {
313      super(name);
314    }
315
316    @Override
317    public void run() {
318      while (!server.isStopped() && running.get()) {
319        FlushQueueEntry fqe = null;
320        try {
321          wakeupPending.set(false); // allow someone to wake us up again
322          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
323          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
324            FlushType type = isAboveLowWaterMark();
325            if (type != FlushType.NORMAL) {
326              LOG.debug("Flush thread woke up because memory above low water="
327                + TraditionalBinaryPrefix.long2String(
328                  server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
329              // For offheap memstore, even if the lower water mark was breached due to heap
330              // overhead
331              // we still select the regions based on the region's memstore data size.
332              // TODO : If we want to decide based on heap over head it can be done without tracking
333              // it per region.
334              if (!flushOneForGlobalPressure(type)) {
335                // Wasn't able to flush any region, but we're above low water mark
336                // This is unlikely to happen, but might happen when closing the
337                // entire server - another thread is flushing regions. We'll just
338                // sleep a little bit to avoid spinning, and then pretend that
339                // we flushed one, so anyone blocked will check again
340                Thread.sleep(1000);
341                wakeUpIfBlocking();
342              }
343              // Enqueue another one of these tokens so we'll wake up again
344              wakeupFlushThread();
345            }
346            continue;
347          }
348          FlushRegionEntry fre = (FlushRegionEntry) fqe;
349          if (!flushRegion(fre)) {
350            break;
351          }
352        } catch (InterruptedException ex) {
353          continue;
354        } catch (ConcurrentModificationException ex) {
355          continue;
356        } catch (Exception ex) {
357          LOG.error("Cache flusher failed for entry " + fqe, ex);
358          if (!server.checkFileSystem()) {
359            break;
360          }
361        }
362      }
363
364      if (server.isStopped()) {
365        synchronized (regionsInQueue) {
366          regionsInQueue.clear();
367          flushQueue.clear();
368        }
369
370        // Signal anyone waiting, so they see the close flag
371        wakeUpIfBlocking();
372      }
373      LOG.info(getName() + " exiting");
374    }
375
376    public void shutdown() {
377      if (!running.compareAndSet(true, false)) {
378        LOG.warn("{} is already signaled to shutdown", getName());
379      }
380    }
381  }
382
383  private void wakeupFlushThread() {
384    if (wakeupPending.compareAndSet(false, true)) {
385      flushQueue.add(WAKEUPFLUSH_INSTANCE);
386    }
387  }
388
389  private HRegion getBiggestMemStoreRegion(SortedMap<Long, HRegion> regionsBySize,
390    Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
391    synchronized (regionsInQueue) {
392      for (HRegion region : regionsBySize.values()) {
393        if (excludedRegions.contains(region)) {
394          continue;
395        }
396
397        if (region.writestate.flushing || !region.writestate.writesEnabled) {
398          continue;
399        }
400
401        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
402          continue;
403        }
404        return region;
405      }
406    }
407    return null;
408  }
409
410  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
411    Set<HRegion> excludedRegions) {
412    synchronized (regionsInQueue) {
413      for (HRegion region : regionsBySize.values()) {
414        if (excludedRegions.contains(region)) {
415          continue;
416        }
417
418        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
419          continue;
420        }
421
422        return region;
423      }
424    }
425    return null;
426  }
427
428  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
429    try {
430      return region.refreshStoreFiles();
431    } catch (IOException e) {
432      LOG.warn("Refreshing store files failed with exception", e);
433    }
434    return false;
435  }
436
437  /**
438   * Return the FlushType if global memory usage is above the high watermark
439   */
440  private FlushType isAboveHighWaterMark() {
441    return server.getRegionServerAccounting().isAboveHighWaterMark();
442  }
443
444  /**
445   * Return the FlushType if we're above the low watermark
446   */
447  private FlushType isAboveLowWaterMark() {
448    return server.getRegionServerAccounting().isAboveLowWaterMark();
449  }
450
451  @Override
452  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
453    return this.requestFlush(r, null, tracker);
454  }
455
456  @Override
457  public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
458    synchronized (regionsInQueue) {
459      FlushRegionEntry existFqe = regionsInQueue.get(r);
460      if (existFqe != null) {
461        // if a delayed one exists and not reach the time to execute, just remove it
462        if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
463          LOG.info("Remove the existing delayed flush entry for {}, "
464            + "because we need to flush it immediately", r);
465          this.regionsInQueue.remove(r);
466          this.flushQueue.remove(existFqe);
467          r.decrementFlushesQueuedCount();
468        } else {
469          tracker.notExecuted("Flush already requested on " + r);
470          return false;
471        }
472      }
473
474      // This entry has no delay so it will be added at the top of the flush
475      // queue. It'll come out near immediately.
476      FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
477      this.regionsInQueue.put(r, fqe);
478      this.flushQueue.add(fqe);
479      r.incrementFlushesQueuedCount();
480      return true;
481    }
482  }
483
484  @Override
485  public boolean requestDelayedFlush(HRegion r, long delay) {
486    synchronized (regionsInQueue) {
487      if (!regionsInQueue.containsKey(r)) {
488        // This entry has some delay
489        FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
490        fqe.requeue(delay);
491        this.regionsInQueue.put(r, fqe);
492        this.flushQueue.add(fqe);
493        r.incrementFlushesQueuedCount();
494        return true;
495      }
496      return false;
497    }
498  }
499
500  public int getFlushQueueSize() {
501    return flushQueue.size();
502  }
503
504  /**
505   * Only interrupt once it's done with a run through the work loop.
506   */
507  void interruptIfNecessary() {
508    lock.writeLock().lock();
509    try {
510      for (FlushHandler flushHandler : flushHandlers) {
511        if (flushHandler != null) {
512          flushHandler.interrupt();
513        }
514      }
515    } finally {
516      lock.writeLock().unlock();
517    }
518  }
519
520  synchronized void start(UncaughtExceptionHandler eh) {
521    this.flusherThreadFactory =
522      new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
523    lock.readLock().lock();
524    try {
525      startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
526    } finally {
527      lock.readLock().unlock();
528    }
529  }
530
531  boolean isAlive() {
532    lock.readLock().lock();
533    try {
534      for (FlushHandler flushHandler : flushHandlers) {
535        if (flushHandler != null && flushHandler.isAlive()) {
536          return true;
537        }
538      }
539      return false;
540    } finally {
541      lock.readLock().unlock();
542    }
543  }
544
545  void shutdown() {
546    lock.readLock().lock();
547    try {
548      for (FlushHandler flushHandler : flushHandlers) {
549        if (flushHandler != null) {
550          Threads.shutdown(flushHandler);
551        }
552      }
553    } finally {
554      lock.readLock().unlock();
555    }
556  }
557
558  /**
559   * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry
560   * later.
561   * @return true if the region was successfully flushed, false otherwise. If false, there will be
562   *         accompanying log messages explaining why the region was not flushed.
563   */
564  private boolean flushRegion(final FlushRegionEntry fqe) {
565    HRegion region = fqe.region;
566    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
567      if (fqe.isMaximumWait(this.blockingWaitTime)) {
568        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime)
569          + "ms on a compaction to clean up 'too many store files'; waited "
570          + "long enough... proceeding with flush of "
571          + region.getRegionInfo().getRegionNameAsString());
572      } else {
573        // If this is first time we've been put off, then emit a log message.
574        if (fqe.getRequeueCount() <= 0) {
575          // Note: We don't impose blockingStoreFiles constraint on meta regions
576          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
577            region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
578            this.blockingWaitTime);
579          final CompactSplit compactSplitThread = server.getCompactSplitThread();
580          if (!compactSplitThread.requestSplit(region)) {
581            try {
582              compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
583            } catch (IOException e) {
584              e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
585              LOG.error("Cache flush failed for region "
586                + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
587            }
588          }
589        }
590
591        // Put back on the queue. Have it come back out of the queue
592        // after a delay of this.blockingWaitTime / 100 ms.
593        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
594        // Tell a lie, it's not flushed but it's ok
595        return true;
596      }
597    }
598    return flushRegion(region, false, fqe.families, fqe.getTracker());
599  }
600
601  /**
602   * Flush a region.
603   * @param region         Region to flush.
604   * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed
605   *                       from the flush queue. If false, when we were called from the main flusher
606   *                       run loop and we got the entry to flush by calling poll on the flush queue
607   *                       (which removed it).
608   * @param families       stores of region to flush.
609   * @return true if the region was successfully flushed, false otherwise. If false, there will be
610   *         accompanying log messages explaining why the region was not flushed.
611   */
612  private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families,
613    FlushLifeCycleTracker tracker) {
614    synchronized (this.regionsInQueue) {
615      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
616      // Use the start time of the FlushRegionEntry if available
617      if (fqe != null && emergencyFlush) {
618        // Need to remove from region from delay queue. When NOT an
619        // emergencyFlush, then item was removed via a flushQueue.poll.
620        flushQueue.remove(fqe);
621      }
622    }
623
624    tracker.beforeExecution();
625    lock.readLock().lock();
626    final CompactSplit compactSplitThread = server.getCompactSplitThread();
627    try {
628      notifyFlushRequest(region, emergencyFlush);
629      FlushResult flushResult = region.flushcache(families, false, tracker);
630      boolean shouldCompact = flushResult.isCompactionNeeded();
631      // We just want to check the size
632      boolean shouldSplit = region.checkSplit().isPresent();
633      if (shouldSplit) {
634        compactSplitThread.requestSplit(region);
635      } else if (shouldCompact) {
636        compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
637      }
638    } catch (DroppedSnapshotException ex) {
639      // Cache flush can fail in a few places. If it fails in a critical
640      // section, we get a DroppedSnapshotException and a replay of wal
641      // is required. Currently the only way to do this is a restart of
642      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
643      // where hdfs was bad but passed the hdfs check).
644      server.abort("Replay of WAL required. Forcing server shutdown", ex);
645      return false;
646    } catch (IOException ex) {
647      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
648      LOG.error("Cache flush failed" + (region != null
649        ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
650        : ""), ex);
651      if (!server.checkFileSystem()) {
652        return false;
653      }
654    } finally {
655      lock.readLock().unlock();
656      wakeUpIfBlocking();
657      tracker.afterExecution();
658    }
659    return true;
660  }
661
662  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
663    FlushType type = null;
664    if (emergencyFlush) {
665      type = isAboveHighWaterMark();
666    }
667    if (type == null) {
668      type = isAboveLowWaterMark();
669    }
670    for (FlushRequestListener listener : flushRequestListeners) {
671      listener.flushRequested(type, region);
672    }
673  }
674
675  private void wakeUpIfBlocking() {
676    synchronized (blockSignal) {
677      blockSignal.notifyAll();
678    }
679  }
680
681  private boolean isTooManyStoreFiles(Region region) {
682
683    // When compaction is disabled, the region is flushable
684    if (!region.getTableDescriptor().isCompactionEnabled()) {
685      return false;
686    }
687
688    for (Store store : region.getStores()) {
689      if (store.hasTooManyStoreFiles()) {
690        return true;
691      }
692    }
693    return false;
694  }
695
696  private int getStoreFileCount(Region region) {
697    int count = 0;
698    for (Store store : region.getStores()) {
699      count += store.getStorefilesCount();
700    }
701    return count;
702  }
703
704  /**
705   * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush
706   * regions with the biggest memstores until we're down to the lower limit. This method blocks
707   * callers until we're down to a safe amount of memstore consumption.
708   */
709  public void reclaimMemStoreMemory() {
710    Span span =
711      TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
712    try (Scope scope = span.makeCurrent()) {
713      FlushType flushType = isAboveHighWaterMark();
714      if (flushType != FlushType.NORMAL) {
715        span.addEvent("Force Flush. We're above high water mark.");
716        long start = EnvironmentEdgeManager.currentTime();
717        long nextLogTimeMs = start;
718        synchronized (this.blockSignal) {
719          boolean blocked = false;
720          long startTime = 0;
721          boolean interrupted = false;
722          try {
723            flushType = isAboveHighWaterMark();
724            while (flushType != FlushType.NORMAL && !server.isStopped()) {
725              if (!blocked) {
726                startTime = EnvironmentEdgeManager.currentTime();
727                if (!server.getRegionServerAccounting().isOffheap()) {
728                  logMsg("global memstore heapsize",
729                    server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
730                    server.getRegionServerAccounting().getGlobalMemStoreLimit());
731                } else {
732                  switch (flushType) {
733                    case ABOVE_OFFHEAP_HIGHER_MARK:
734                      logMsg("the global offheap memstore datasize",
735                        server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
736                        server.getRegionServerAccounting().getGlobalMemStoreLimit());
737                      break;
738                    case ABOVE_ONHEAP_HIGHER_MARK:
739                      logMsg("global memstore heapsize",
740                        server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
741                        server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
742                      break;
743                    default:
744                      break;
745                  }
746                }
747              }
748              blocked = true;
749              wakeupFlushThread();
750              try {
751                // we should be able to wait forever, but we've seen a bug where
752                // we miss a notify, so put a 5 second bound on it at least.
753                blockSignal.wait(5 * 1000);
754              } catch (InterruptedException ie) {
755                LOG.warn("Interrupted while waiting");
756                interrupted = true;
757              }
758              long nowMs = EnvironmentEdgeManager.currentTime();
759              if (nowMs >= nextLogTimeMs) {
760                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
761                nextLogTimeMs = nowMs + 1000;
762              }
763              flushType = isAboveHighWaterMark();
764            }
765          } finally {
766            if (interrupted) {
767              Thread.currentThread().interrupt();
768            }
769          }
770
771          if (blocked) {
772            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
773            if (totalTime > 0) {
774              this.updatesBlockedMsHighWater.add(totalTime);
775            }
776            LOG.info("Unblocking updates for server " + server.toString());
777          }
778        }
779      } else {
780        flushType = isAboveLowWaterMark();
781        if (flushType != FlushType.NORMAL) {
782          wakeupFlushThread();
783        }
784        span.end();
785      }
786    }
787  }
788
789  private void logMsg(String type, long val, long max) {
790    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
791      TraditionalBinaryPrefix.long2String(val, "", 1),
792      TraditionalBinaryPrefix.long2String(max, "", 1));
793  }
794
795  @Override
796  public String toString() {
797    return "flush_queue=" + flushQueue.size();
798  }
799
800  public String dumpQueue() {
801    StringBuilder queueList = new StringBuilder();
802    queueList.append("Flush Queue Queue dump:\n");
803    queueList.append("  Flush Queue:\n");
804    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
805
806    while (it.hasNext()) {
807      queueList.append("    " + it.next().toString());
808      queueList.append("\n");
809    }
810
811    return queueList.toString();
812  }
813
814  /**
815   * Register a MemstoreFlushListener
816   */
817  @Override
818  public void registerFlushRequestListener(final FlushRequestListener listener) {
819    this.flushRequestListeners.add(listener);
820  }
821
822  /**
823   * Unregister the listener from MemstoreFlushListeners
824   * @return true when passed listener is unregistered successfully.
825   */
826  @Override
827  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
828    return this.flushRequestListeners.remove(listener);
829  }
830
831  /**
832   * Sets the global memstore limit to a new size.
833   */
834  @Override
835  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
836    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
837    reclaimMemStoreMemory();
838  }
839
840  interface FlushQueueEntry extends Delayed {
841  }
842
843  /**
844   * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this
845   * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a
846   * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in
847   * milliseconds before readding to delay queue if you want it to stay there a while.
848   */
849  static class FlushRegionEntry implements FlushQueueEntry {
850    private final HRegion region;
851
852    private final long createTime;
853    private long whenToExpire;
854    private int requeueCount = 0;
855
856    private final List<byte[]> families;
857
858    private final FlushLifeCycleTracker tracker;
859
860    FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
861      this.region = r;
862      this.createTime = EnvironmentEdgeManager.currentTime();
863      this.whenToExpire = this.createTime;
864      this.families = families;
865      this.tracker = tracker;
866    }
867
868    /** Returns True if we have been delayed > <code>maximumWait</code> milliseconds. */
869    public boolean isMaximumWait(final long maximumWait) {
870      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
871    }
872
873    /** Returns True if the entry is a delay flush task */
874    protected boolean isDelay() {
875      return this.whenToExpire > this.createTime;
876    }
877
878    /**
879     * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've
880     *         been requeued.
881     */
882    public int getRequeueCount() {
883      return this.requeueCount;
884    }
885
886    public FlushLifeCycleTracker getTracker() {
887      return tracker;
888    }
889
890    /**
891     * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This
892     *             method adds EnvironmentEdgeManager.currentTime() to whatever you pass.
893     * @return This.
894     */
895    public FlushRegionEntry requeue(final long when) {
896      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
897      this.requeueCount++;
898      return this;
899    }
900
901    @Override
902    public long getDelay(TimeUnit unit) {
903      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
904        TimeUnit.MILLISECONDS);
905    }
906
907    @Override
908    public int compareTo(Delayed other) {
909      // Delay is compared first. If there is a tie, compare region's hash code
910      int ret =
911        Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS))
912          .intValue();
913      if (ret != 0) {
914        return ret;
915      }
916      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
917      return hashCode() - otherEntry.hashCode();
918    }
919
920    @Override
921    public String toString() {
922      return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]";
923    }
924
925    @Override
926    public int hashCode() {
927      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
928      return hash ^ region.hashCode();
929    }
930
931    @Override
932    public boolean equals(Object obj) {
933      if (this == obj) {
934        return true;
935      }
936      if (obj == null || getClass() != obj.getClass()) {
937        return false;
938      }
939      FlushRegionEntry other = (FlushRegionEntry) obj;
940      if (
941        !Bytes.equals(this.region.getRegionInfo().getRegionName(),
942          other.region.getRegionInfo().getRegionName())
943      ) {
944        return false;
945      }
946      return compareTo(other) == 0;
947    }
948  }
949
950  private int getHandlerCount(Configuration conf) {
951    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
952    if (handlerCount < 1) {
953      LOG.warn(
954        "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
955        handlerCount);
956      handlerCount = 1;
957    }
958    return handlerCount;
959  }
960
961  @Override
962  public void onConfigurationChange(Configuration newConf) {
963    int newHandlerCount = getHandlerCount(newConf);
964    if (newHandlerCount != flushHandlers.length) {
965      LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
966        newHandlerCount);
967      lock.writeLock().lock();
968      try {
969        FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
970        if (newHandlerCount > flushHandlers.length) {
971          startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
972        } else {
973          stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
974        }
975        flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
976        this.flushHandlers = newFlushHandlers;
977      } finally {
978        lock.writeLock().unlock();
979      }
980    }
981  }
982
983  private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
984    if (flusherThreadFactory != null) {
985      for (int i = start; i < end; i++) {
986        flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
987        flusherThreadFactory.newThread(flushHandlers[i]);
988        flushHandlers[i].start();
989      }
990    }
991  }
992
993  private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
994    for (int i = start; i < end; i++) {
995      flushHandlers[i].shutdown();
996      if (LOG.isDebugEnabled()) {
997        LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
998      }
999    }
1000  }
1001
1002  public int getFlusherCount() {
1003    return flusherIdGen.get();
1004  }
1005}