001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
036import org.apache.hadoop.hbase.procedure2.util.StringUtils;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
045
046/**
047 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit count
048 * threshold. Creates its own threadpool to run RPCs with timeout.
049 * <ul>
050 * <li>Each server queue has a dispatch buffer</li>
051 * <li>Once the dispatch buffer reaches a threshold-size/time we send
052 * <li>
053 * </ul>
054 * <p>
055 * Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, call {@link #stop()}.
056 */
057@InterfaceAudience.Private
058public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
059  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
060
061  public static final String THREAD_POOL_SIZE_CONF_KEY =
062    "hbase.procedure.remote.dispatcher.threadpool.size";
063  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
064
065  public static final String DISPATCH_DELAY_CONF_KEY =
066    "hbase.procedure.remote.dispatcher.delay.msec";
067  private static final int DEFAULT_DISPATCH_DELAY = 150;
068
069  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
070    "hbase.procedure.remote.dispatcher.max.queue.size";
071  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
072
073  private final AtomicBoolean running = new AtomicBoolean(false);
074  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
075    new ConcurrentHashMap<TRemote, BufferNode>();
076
077  private final int operationDelay;
078  private final int queueMaxSize;
079  private final int corePoolSize;
080
081  private TimeoutExecutorThread timeoutExecutor;
082  private ThreadPoolExecutor threadPool;
083
084  protected RemoteProcedureDispatcher(Configuration conf) {
085    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
086    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
087    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
088  }
089
090  public boolean start() {
091    if (running.getAndSet(true)) {
092      LOG.warn("Already running");
093      return false;
094    }
095
096    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, "
097      + "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
098
099    // Create the timeout executor
100    timeoutExecutor = new TimeoutExecutorThread();
101    timeoutExecutor.start();
102
103    // Create the thread pool that will execute RPCs
104    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
105      new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
106        .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
107    return true;
108  }
109
110  protected void setTimeoutExecutorUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
111    timeoutExecutor.setUncaughtExceptionHandler(eh);
112  }
113
114  public boolean stop() {
115    if (!running.getAndSet(false)) {
116      return false;
117    }
118
119    LOG.info("Stopping procedure remote dispatcher");
120
121    // send stop signals
122    timeoutExecutor.sendStopSignal();
123    threadPool.shutdownNow();
124    return true;
125  }
126
127  public void join() {
128    assert !running.get() : "expected not running";
129
130    // wait the timeout executor
131    timeoutExecutor.awaitTermination();
132    timeoutExecutor = null;
133
134    // wait for the thread pool to terminate
135    threadPool.shutdownNow();
136    try {
137      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
138        LOG.warn("Waiting for thread-pool to terminate");
139      }
140    } catch (InterruptedException e) {
141      LOG.warn("Interrupted while waiting for thread-pool termination", e);
142    }
143  }
144
145  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
146
147  // ============================================================================================
148  // Node Helpers
149  // ============================================================================================
150  /**
151   * Add a node that will be able to execute remote procedures
152   * @param key the node identifier
153   */
154  public void addNode(final TRemote key) {
155    assert key != null : "Tried to add a node with a null key";
156    nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
157  }
158
159  /**
160   * Add a remote rpc.
161   * @param key the node identifier
162   */
163  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
164    throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
165    if (key == null) {
166      throw new NullTargetServerDispatchException(rp.toString());
167    }
168    BufferNode node = nodeMap.get(key);
169    if (node == null) {
170      // If null here, it means node has been removed because it crashed. This happens when server
171      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
172      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
173    }
174    node.add(rp);
175    // Check our node still in the map; could have been removed by #removeNode.
176    if (!nodeMap.containsValue(node)) {
177      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
178    }
179  }
180
181  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
182    BufferNode node = nodeMap.get(key);
183    if (node == null) {
184      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
185        key);
186      return;
187    }
188    node.operationCompleted(rp);
189  }
190
191  /**
192   * Remove a remote node
193   * @param key the node identifier
194   */
195  public boolean removeNode(final TRemote key) {
196    final BufferNode node = nodeMap.remove(key);
197    if (node == null) {
198      return false;
199    }
200
201    node.abortOperationsInQueue();
202    return true;
203  }
204
205  // ============================================================================================
206  // Task Helpers
207  // ============================================================================================
208  protected final void submitTask(Runnable task) {
209    threadPool.execute(task);
210  }
211
212  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
213    timeoutExecutor.add(new DelayedTask(task, delay, unit));
214  }
215
216  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
217
218  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
219
220  /**
221   * Data structure with reference to remote operation.
222   */
223  public static abstract class RemoteOperation {
224    private final RemoteProcedure remoteProcedure;
225    // active time of the master that sent this request, used for fencing
226    private final long initiatingMasterActiveTime;
227
228    protected RemoteOperation(final RemoteProcedure remoteProcedure,
229      long initiatingMasterActiveTime) {
230      this.remoteProcedure = remoteProcedure;
231      this.initiatingMasterActiveTime = initiatingMasterActiveTime;
232    }
233
234    public RemoteProcedure getRemoteProcedure() {
235      return remoteProcedure;
236    }
237
238    public long getInitiatingMasterActiveTime() {
239      return initiatingMasterActiveTime;
240    }
241  }
242
243  /**
244   * Remote procedure reference.
245   */
246  public interface RemoteProcedure<TEnv, TRemote> {
247    /**
248     * For building the remote operation. May be empty if no need to send remote call. Usually, this
249     * means the RemoteProcedure has been finished already. This is possible, as we may have already
250     * sent the procedure to RS but then the rpc connection is broken so the executeProcedures call
251     * fails, but the RS does receive the procedure and execute it and then report back, before we
252     * retry again.
253     */
254    Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);
255
256    /**
257     * Called when the executeProcedure call is failed.
258     */
259    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
260
261    /**
262     * Called when RS tells the remote procedure is succeeded through the
263     * {@code reportProcedureDone} method.
264     */
265    void remoteOperationCompleted(TEnv env);
266
267    /**
268     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
269     * method.
270     */
271    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
272
273    /**
274     * Whether store this remote procedure in dispatched queue only OpenRegionProcedure and
275     * CloseRegionProcedure return false since they are not fully controlled by dispatcher
276     */
277    default boolean storeInDispatchedQueue() {
278      return true;
279    }
280  }
281
282  /**
283   * Account of what procedures are running on remote node.
284   */
285  public interface RemoteNode<TEnv, TRemote> {
286    TRemote getKey();
287
288    void add(RemoteProcedure<TEnv, TRemote> operation);
289
290    void dispatch();
291  }
292
293  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
294    final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
295    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
296    for (RemoteProcedure proc : remoteProcedures) {
297      Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
298      operation.ifPresent(op -> requestByType.put(op.getClass(), op));
299    }
300    return requestByType;
301  }
302
303  protected <T extends RemoteOperation> List<T> fetchType(
304    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
305    return (List<T>) requestByType.removeAll(type);
306  }
307
308  @RestrictedApi(explanation = "Should only be called in tests", link = "",
309      allowedOnPath = ".*/src/test/.*")
310  public boolean hasNode(TRemote key) {
311    return nodeMap.containsKey(key);
312  }
313
314  // ============================================================================================
315  // Timeout Helpers
316  // ============================================================================================
317  private final class TimeoutExecutorThread extends Thread {
318    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
319
320    public TimeoutExecutorThread() {
321      super("ProcedureDispatcherTimeoutThread");
322    }
323
324    @Override
325    public void run() {
326      while (running.get()) {
327        final DelayedWithTimeout task =
328          DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS);
329        if (task == null || task == DelayedUtil.DELAYED_POISON) {
330          if (task == null && queue.size() > 0) {
331            LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
332              + " elapsed. If this is repeated consistently, it means no element is getting expired"
333              + " from the queue and it might freeze the system. Queue: {}", queue);
334          }
335          // the executor may be shutting down, and the task is just the shutdown request
336          continue;
337        }
338        if (task instanceof DelayedTask) {
339          threadPool.execute(((DelayedTask) task).getObject());
340        } else {
341          ((BufferNode) task).dispatch();
342        }
343      }
344    }
345
346    public void add(final DelayedWithTimeout delayed) {
347      queue.add(delayed);
348    }
349
350    public void remove(final DelayedWithTimeout delayed) {
351      queue.remove(delayed);
352    }
353
354    public void sendStopSignal() {
355      queue.add(DelayedUtil.DELAYED_POISON);
356    }
357
358    public void awaitTermination() {
359      try {
360        final long startTime = EnvironmentEdgeManager.currentTime();
361        for (int i = 0; isAlive(); ++i) {
362          sendStopSignal();
363          join(250);
364          if (i > 0 && (i % 8) == 0) {
365            LOG.warn("Waiting termination of thread " + getName() + ", "
366              + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
367          }
368        }
369      } catch (InterruptedException e) {
370        LOG.warn(getName() + " join wait got interrupted", e);
371      }
372    }
373  }
374
375  // ============================================================================================
376  // Internals Helpers
377  // ============================================================================================
378
379  /**
380   * Node that contains a set of RemoteProcedures
381   */
382  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
383    implements RemoteNode<TEnv, TRemote> {
384    private Set<RemoteProcedure> operations;
385    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
386
387    protected BufferNode(final TRemote key) {
388      super(key, 0);
389    }
390
391    @Override
392    public TRemote getKey() {
393      return getObject();
394    }
395
396    @Override
397    public synchronized void add(final RemoteProcedure operation) {
398      if (this.operations == null) {
399        this.operations = new HashSet<>();
400        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
401        timeoutExecutor.add(this);
402      }
403      this.operations.add(operation);
404      if (this.operations.size() > queueMaxSize) {
405        timeoutExecutor.remove(this);
406        dispatch();
407      }
408    }
409
410    @Override
411    public synchronized void dispatch() {
412      if (operations != null) {
413        remoteDispatch(getKey(), operations);
414        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
415          .forEach(operation -> dispatchedOperations.add(operation));
416        this.operations = null;
417      }
418    }
419
420    public synchronized void abortOperationsInQueue() {
421      if (operations != null) {
422        abortPendingOperations(getKey(), operations);
423        this.operations = null;
424      }
425      abortPendingOperations(getKey(), dispatchedOperations);
426      this.dispatchedOperations.clear();
427    }
428
429    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure) {
430      this.dispatchedOperations.remove(remoteProcedure);
431    }
432
433    @Override
434    public String toString() {
435      return super.toString() + ", operations=" + this.operations;
436    }
437  }
438
439  /**
440   * Delayed object that holds a FutureTask.
441   * <p/>
442   * used to submit something later to the thread-pool.
443   */
444  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
445    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
446      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
447    }
448  }
449}