001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ThreadPoolExecutor; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.errorhandling.ForeignException; 034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 035import org.apache.hadoop.hbase.regionserver.RegionServerServices; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.zookeeper.KeeperException; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045public class SimpleRSProcedureManager extends RegionServerProcedureManager { 046 047 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class); 048 049 private RegionServerServices rss; 050 private ProcedureMemberRpcs memberRpcs; 051 private ProcedureMember member; 052 053 @Override 054 public void initialize(RegionServerServices rss) throws KeeperException { 055 this.rss = rss; 056 ZKWatcher zkw = rss.getZooKeeper(); 057 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature()); 058 059 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 1); 060 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder()); 061 LOG.info("Initialized: " + rss.getServerName().toString()); 062 } 063 064 @Override 065 public void start() { 066 this.memberRpcs.start(rss.getServerName().toString(), member); 067 LOG.info("Started."); 068 } 069 070 @Override 071 public void stop(boolean force) throws IOException { 072 LOG.info("stop: " + force); 073 try { 074 this.member.close(); 075 } finally { 076 this.memberRpcs.close(); 077 } 078 } 079 080 @Override 081 public String getProcedureSignature() { 082 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE; 083 } 084 085 /** 086 * If in a running state, creates the specified subprocedure for handling a procedure. 087 * @return Subprocedure to submit to the ProcedureMember. 088 */ 089 public Subprocedure buildSubprocedure(String name) { 090 091 // don't run a procedure if the parent is stop(ping) 092 if (rss.isStopping() || rss.isStopped()) { 093 throw new IllegalStateException( 094 "Can't start procedure on RS: " + rss.getServerName() + ", because stopping/stopped!"); 095 } 096 097 LOG.info("Attempting to run a procedure."); 098 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); 099 Configuration conf = rss.getConfiguration(); 100 101 SimpleSubprocedurePool taskManager = 102 new SimpleSubprocedurePool(rss.getServerName().toString(), conf); 103 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name); 104 } 105 106 /** 107 * Build the actual procedure runner that will do all the 'hard' work 108 */ 109 public class SimleSubprocedureBuilder implements SubprocedureFactory { 110 111 @Override 112 public Subprocedure buildSubprocedure(String name, byte[] data) { 113 LOG.info("Building procedure: " + name); 114 return SimpleRSProcedureManager.this.buildSubprocedure(name); 115 } 116 } 117 118 public static class SimpleSubprocedurePool implements Closeable, Abortable { 119 120 private final ExecutorCompletionService<Void> taskPool; 121 private final ExecutorService executor; 122 private volatile boolean aborted; 123 private final List<Future<Void>> futures = new ArrayList<>(); 124 private final String name; 125 126 public SimpleSubprocedurePool(String name, Configuration conf) { 127 this.name = name; 128 executor = Executors.newSingleThreadExecutor( 129 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d") 130 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 131 taskPool = new ExecutorCompletionService<>(executor); 132 } 133 134 /** 135 * Submit a task to the pool. 136 */ 137 public void submitTask(final Callable<Void> task) { 138 Future<Void> f = this.taskPool.submit(task); 139 futures.add(f); 140 } 141 142 /** 143 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} 144 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 145 */ 146 public boolean waitForOutstandingTasks() throws ForeignException { 147 LOG.debug("Waiting for procedure to finish."); 148 149 try { 150 for (Future<Void> f : futures) { 151 f.get(); 152 } 153 return true; 154 } catch (InterruptedException e) { 155 if (aborted) 156 throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", 157 e); 158 Thread.currentThread().interrupt(); 159 } catch (ExecutionException e) { 160 if (e.getCause() instanceof ForeignException) { 161 throw (ForeignException) e.getCause(); 162 } 163 throw new ForeignException(name, e.getCause()); 164 } finally { 165 // close off remaining tasks 166 for (Future<Void> f : futures) { 167 if (!f.isDone()) { 168 f.cancel(true); 169 } 170 } 171 } 172 return false; 173 } 174 175 /** 176 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly 177 * finish 178 */ 179 @Override 180 public void close() { 181 executor.shutdown(); 182 } 183 184 @Override 185 public void abort(String why, Throwable e) { 186 if (this.aborted) return; 187 188 this.aborted = true; 189 LOG.warn("Aborting because: " + why, e); 190 this.executor.shutdownNow(); 191 } 192 193 @Override 194 public boolean isAborted() { 195 return this.aborted; 196 } 197 } 198 199 public class SimpleSubprocedure extends Subprocedure { 200 private final RegionServerServices rss; 201 private final SimpleSubprocedurePool taskManager; 202 203 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member, 204 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) { 205 super(member, name, errorListener, 500, 60000); 206 LOG.info("Constructing a SimpleSubprocedure."); 207 this.rss = rss; 208 this.taskManager = taskManager; 209 } 210 211 /** 212 * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified 213 * with no use of subprocedurepool. 214 */ 215 class RSSimpleTask implements Callable<Void> { 216 RSSimpleTask() { 217 } 218 219 @Override 220 public Void call() throws Exception { 221 LOG.info("Execute subprocedure on " + rss.getServerName().toString()); 222 return null; 223 } 224 225 } 226 227 private void execute() throws ForeignException { 228 229 monitor.rethrowException(); 230 231 // running a task (e.g., roll log, flush table) on region server 232 taskManager.submitTask(new RSSimpleTask()); 233 monitor.rethrowException(); 234 235 // wait for everything to complete. 236 taskManager.waitForOutstandingTasks(); 237 monitor.rethrowException(); 238 239 } 240 241 @Override 242 public void acquireBarrier() throws ForeignException { 243 // do nothing, executing in inside barrier step. 244 } 245 246 /** 247 * do a log roll. 248 */ 249 @Override 250 public byte[] insideBarrier() throws ForeignException { 251 execute(); 252 return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA); 253 } 254 255 /** 256 * Cancel threads if they haven't finished. 257 */ 258 @Override 259 public void cleanup(Exception e) { 260 taskManager.abort("Aborting simple subprocedure tasks due to error", e); 261 } 262 } 263 264}