001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicLong; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.log.HBaseMarkers; 035import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore; 036import org.apache.hadoop.hbase.regionserver.HRegion; 037import org.apache.hadoop.hbase.regionserver.HStore; 038import org.apache.hadoop.hbase.regionserver.Store; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.hbase.util.HFileArchiveUtil; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046 047/** 048 * As long as there is no RegionServerServices for a master local region, we need implement the 049 * flush and compaction logic by our own. 050 * <p/> 051 * The flush logic is very simple, every time after calling a modification method in 052 * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this 053 * method, we will check the memstore size and if it is above the flush size, we will call 054 * {@link HRegion#flush(boolean)} to force flush all stores. 055 * <p/> 056 * And for compaction, the logic is also very simple. After flush, we will check the store file 057 * count, if it is above the compactMin, we will do a major compaction. 058 */ 059@InterfaceAudience.Private 060class MasterRegionFlusherAndCompactor implements Closeable { 061 062 private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class); 063 064 private final Configuration conf; 065 066 private final Abortable abortable; 067 068 private final HRegion region; 069 070 // as we can only count this outside the region's write/flush process so it is not accurate, but 071 // it is enough. 072 private final AtomicLong changesAfterLastFlush = new AtomicLong(0); 073 074 private final long flushSize; 075 076 private final long flushPerChanges; 077 078 private final long flushIntervalMs; 079 080 private final int compactMin; 081 082 private final Path globalArchivePath; 083 084 private final String archivedHFileSuffix; 085 086 private final Thread flushThread; 087 088 private final Lock flushLock = new ReentrantLock(); 089 090 private final Condition flushCond = flushLock.newCondition(); 091 092 private boolean flushRequest = false; 093 094 private long lastFlushTime; 095 096 private final ExecutorService compactExecutor; 097 098 private final Lock compactLock = new ReentrantLock(); 099 100 private boolean compactRequest = false; 101 102 private volatile boolean closed = false; 103 104 MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region, 105 long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin, 106 Path globalArchivePath, String archivedHFileSuffix) { 107 this.conf = conf; 108 this.abortable = abortable; 109 this.region = region; 110 this.flushSize = flushSize; 111 this.flushPerChanges = flushPerChanges; 112 this.flushIntervalMs = flushIntervalMs; 113 this.compactMin = compactMin; 114 this.globalArchivePath = globalArchivePath; 115 this.archivedHFileSuffix = archivedHFileSuffix; 116 flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher"); 117 flushThread.setDaemon(true); 118 flushThread.start(); 119 compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 120 .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true) 121 .build()); 122 LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}", 123 flushSize, flushPerChanges, flushIntervalMs, compactMin); 124 } 125 126 // inject our flush related configurations 127 static void setupConf(Configuration conf, long flushSize, long flushPerChanges, 128 long flushIntervalMs) { 129 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize); 130 conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges); 131 conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs); 132 LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize, 133 flushPerChanges, flushIntervalMs); 134 } 135 136 private void moveHFileToGlobalArchiveDir() throws IOException { 137 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 138 for (HStore store : region.getStores()) { 139 store.closeAndArchiveCompactedFiles(); 140 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(), 141 store.getColumnFamilyDescriptor().getName()); 142 Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath( 143 globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName()); 144 try { 145 if (fs.exists(storeArchiveDir)) { 146 MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir, 147 archivedHFileSuffix); 148 } else { 149 LOG.warn( 150 "Archived dir {} does not exist, there is no need to move archived hfiles from {} " 151 + "to global dir {} .", 152 storeArchiveDir, storeArchiveDir, globalStoreArchiveDir); 153 } 154 } catch (IOException e) { 155 LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir, 156 globalStoreArchiveDir, e); 157 } 158 } 159 } 160 161 private void compact() { 162 try { 163 region.compact(true); 164 moveHFileToGlobalArchiveDir(); 165 } catch (IOException e) { 166 LOG.error("Failed to compact master local region", e); 167 } 168 compactLock.lock(); 169 try { 170 if (needCompaction()) { 171 compactExecutor.execute(this::compact); 172 } else { 173 compactRequest = false; 174 } 175 } finally { 176 compactLock.unlock(); 177 } 178 } 179 180 private boolean needCompaction() { 181 for (Store store : region.getStores()) { 182 if (store.getStorefilesCount() >= compactMin) { 183 return true; 184 } 185 } 186 return false; 187 } 188 189 private void flushLoop() { 190 recordLastFlushTime(); 191 while (!closed) { 192 flushLock.lock(); 193 try { 194 while (!flushRequest) { 195 long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime(); 196 if (waitTimeMs <= 0) { 197 flushRequest = true; 198 break; 199 } 200 flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS); 201 if (closed) { 202 return; 203 } 204 } 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 continue; 208 } finally { 209 flushLock.unlock(); 210 } 211 assert flushRequest; 212 resetChangesAfterLastFlush(); 213 try { 214 region.flush(true); 215 recordLastFlushTime(); 216 } catch (IOException e) { 217 LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e); 218 abortable.abort("Failed to flush master local region", e); 219 return; 220 } 221 compactLock.lock(); 222 try { 223 if (!compactRequest && needCompaction()) { 224 compactRequest = true; 225 compactExecutor.execute(this::compact); 226 } 227 } finally { 228 compactLock.unlock(); 229 } 230 flushLock.lock(); 231 try { 232 // reset the flushRequest flag 233 if (!shouldFlush(changesAfterLastFlush.get())) { 234 flushRequest = false; 235 } 236 } finally { 237 flushLock.unlock(); 238 } 239 } 240 } 241 242 private boolean shouldFlush(long changes) { 243 long heapSize = region.getMemStoreHeapSize(); 244 long offHeapSize = region.getMemStoreOffHeapSize(); 245 boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges; 246 if (flush && LOG.isTraceEnabled()) { 247 LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}", 248 heapSize + offHeapSize, flushSize, changes, flushPerChanges); 249 } 250 return flush; 251 } 252 253 void onUpdate() { 254 long changes = changesAfterLastFlush.incrementAndGet(); 255 if (shouldFlush(changes)) { 256 requestFlush(); 257 } 258 } 259 260 void requestFlush() { 261 flushLock.lock(); 262 try { 263 if (flushRequest) { 264 return; 265 } 266 flushRequest = true; 267 flushCond.signalAll(); 268 } finally { 269 flushLock.unlock(); 270 } 271 } 272 273 void resetChangesAfterLastFlush() { 274 changesAfterLastFlush.set(0); 275 } 276 277 void recordLastFlushTime() { 278 lastFlushTime = EnvironmentEdgeManager.currentTime(); 279 } 280 281 @Override 282 public void close() { 283 closed = true; 284 flushThread.interrupt(); 285 compactExecutor.shutdown(); 286 } 287}