001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.concurrent.DelayQueue; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 023import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * Runs task on a period such as check for stuck workers. 030 * @see InlineChore 031 */ 032@InterfaceAudience.Private 033class TimeoutExecutorThread<TEnvironment> extends StoppableThread { 034 035 private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); 036 037 private final ProcedureExecutor<TEnvironment> executor; 038 039 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); 040 041 public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group, 042 String name) { 043 super(group, name); 044 setDaemon(true); 045 this.executor = executor; 046 } 047 048 @Override 049 public void sendStopSignal() { 050 queue.add(DelayedUtil.DELAYED_POISON); 051 } 052 053 @Override 054 public void run() { 055 while (executor.isRunning()) { 056 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS); 057 if (task == null || task == DelayedUtil.DELAYED_POISON) { 058 // the executor may be shutting down, 059 // and the task is just the shutdown request 060 continue; 061 } 062 LOG.trace("Executing {}", task); 063 064 // execute the task 065 if (task instanceof InlineChore) { 066 execInlineChore((InlineChore) task); 067 } else if (task instanceof DelayedProcedure) { 068 execDelayedProcedure((DelayedProcedure<TEnvironment>) task); 069 } else { 070 LOG.error("CODE-BUG unknown timeout task type {}", task); 071 } 072 } 073 } 074 075 public void add(InlineChore chore) { 076 chore.refreshTimeout(); 077 queue.add(chore); 078 } 079 080 public void add(Procedure<TEnvironment> procedure) { 081 if (procedure.getTimeout() > 0) { 082 LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), 083 procedure.getTimeoutTimestamp()); 084 queue.add(new DelayedProcedure<>(procedure)); 085 } else { 086 LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); 087 } 088 } 089 090 public boolean remove(Procedure<TEnvironment> procedure) { 091 return queue.remove(new DelayedProcedure<>(procedure)); 092 } 093 094 private void execInlineChore(InlineChore chore) { 095 chore.run(); 096 add(chore); 097 } 098 099 private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) { 100 // TODO: treat this as a normal procedure, add it to the scheduler and 101 // let one of the workers handle it. 102 // Today we consider ProcedureInMemoryChore as InlineChores 103 Procedure<TEnvironment> procedure = delayed.getObject(); 104 if (procedure instanceof ProcedureInMemoryChore) { 105 executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure); 106 // if the procedure is in a waiting state again, put it back in the queue 107 procedure.updateTimestamp(); 108 if (procedure.isWaiting()) { 109 delayed.setTimeout(procedure.getTimeoutTimestamp()); 110 queue.add(delayed); 111 } 112 } else { 113 executeTimedoutProcedure(procedure); 114 } 115 } 116 117 private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) { 118 if (!chore.isWaiting()) { 119 return; 120 } 121 122 // The ProcedureInMemoryChore is a special case, and it acts as a chore. 123 // instead of bringing the Chore class in, we reuse this timeout thread for 124 // this special case. 125 try { 126 chore.periodicExecute(executor.getEnvironment()); 127 } catch (Throwable e) { 128 LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e); 129 } 130 } 131 132 protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) { 133 // The procedure received a timeout. if the procedure itself does not handle it, 134 // call abort() and add the procedure back in the queue for rollback. 135 if (proc.setTimeoutFailure(executor.getEnvironment())) { 136 long rootProcId = executor.getRootProcedureId(proc); 137 RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId); 138 procStack.abort(); 139 executor.getStore().update(proc); 140 executor.getScheduler().addFront(proc); 141 } 142 } 143}