001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.util.concurrent.ThreadPoolExecutor; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.conf.ConfigurationObserver; 024import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 025import org.apache.hadoop.hbase.util.Threads; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 031 032/** 033 * The thread pool used for scan directories 034 */ 035@InterfaceAudience.Private 036public class DirScanPool implements ConfigurationObserver { 037 private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class); 038 private volatile int size; 039 private final ThreadPoolExecutor pool; 040 private int cleanerLatch; 041 private boolean reconfigNotification; 042 private Type dirScanPoolType; 043 private final String name; 044 045 private enum Type { 046 LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE, 047 CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE), 048 HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); 049 050 private final String cleanerPoolSizeConfigName; 051 private final String cleanerPoolSizeConfigDefault; 052 053 private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) { 054 this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName; 055 this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault; 056 } 057 } 058 059 private DirScanPool(Configuration conf, Type dirScanPoolType) { 060 this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName, 061 dirScanPoolType.cleanerPoolSizeConfigDefault)); 062 } 063 064 private DirScanPool(Type dirScanPoolType, String poolSize) { 065 this.dirScanPoolType = dirScanPoolType; 066 this.name = dirScanPoolType.name().toLowerCase(); 067 size = CleanerChore.calculatePoolSize(poolSize); 068 // poolSize may be 0 or 0.0 from a careless configuration, 069 // double check to make sure. 070 size = size == 0 071 ? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault) 072 : size; 073 pool = initializePool(size, name); 074 LOG.info("{} Cleaner pool size is {}", name, size); 075 cleanerLatch = 0; 076 } 077 078 private static ThreadPoolExecutor initializePool(int size, String name) { 079 return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, 080 new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true) 081 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 082 } 083 084 /** 085 * Checks if pool can be updated. If so, mark for update later. 086 * @param conf configuration 087 */ 088 @Override 089 public synchronized void onConfigurationChange(Configuration conf) { 090 int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName, 091 dirScanPoolType.cleanerPoolSizeConfigDefault)); 092 if (newSize == size) { 093 LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.", 094 name, newSize); 095 return; 096 } 097 size = newSize; 098 // Chore is working, update it later. 099 reconfigNotification = true; 100 } 101 102 synchronized void latchCountUp() { 103 cleanerLatch++; 104 } 105 106 synchronized void latchCountDown() { 107 cleanerLatch--; 108 notifyAll(); 109 } 110 111 synchronized void execute(Runnable runnable) { 112 pool.execute(runnable); 113 } 114 115 public synchronized void shutdownNow() { 116 if (pool == null || pool.isShutdown()) { 117 return; 118 } 119 pool.shutdownNow(); 120 } 121 122 synchronized void tryUpdatePoolSize(long timeout) { 123 if (!reconfigNotification) { 124 return; 125 } 126 reconfigNotification = false; 127 long stopTime = EnvironmentEdgeManager.currentTime() + timeout; 128 while (cleanerLatch != 0 && timeout > 0) { 129 try { 130 wait(timeout); 131 timeout = stopTime - EnvironmentEdgeManager.currentTime(); 132 } catch (InterruptedException ie) { 133 Thread.currentThread().interrupt(); 134 break; 135 } 136 } 137 LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size); 138 pool.setCorePoolSize(size); 139 } 140 141 public int getSize() { 142 return size; 143 } 144 145 public static DirScanPool getHFileCleanerScanPool(Configuration conf) { 146 return new DirScanPool(conf, Type.HFILE_CLEANER); 147 } 148 149 public static DirScanPool getHFileCleanerScanPool(String poolSize) { 150 return new DirScanPool(Type.HFILE_CLEANER, poolSize); 151 } 152 153 public static DirScanPool getLogCleanerScanPool(Configuration conf) { 154 return new DirScanPool(conf, Type.LOG_CLEANER); 155 } 156}