001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.function.LongConsumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.util.RetryCounter;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
049
050/**
051 * The base class for all replication peer related procedure except sync replication state
052 * transition.
053 */
054@InterfaceAudience.Private
055public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
056
057  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
058
059  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
060
061  // The sleep interval when waiting table to be enabled or disabled.
062  protected static final int SLEEP_INTERVAL_MS = 1000;
063
064  private RetryCounter retryCounter;
065
066  protected ModifyPeerProcedure() {
067  }
068
069  protected ModifyPeerProcedure(String peerId) {
070    super(peerId);
071  }
072
073  /**
074   * Called before we start the actual processing. The implementation should call the pre CP hook,
075   * and also the pre-check for the peer modification.
076   * <p>
077   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
078   * all checks passes then the procedure can not be rolled back any more.
079   */
080  protected abstract void prePeerModification(MasterProcedureEnv env)
081    throws IOException, ReplicationException;
082
083  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
084
085  /**
086   * Called before we finish the procedure. The implementation can do some logging work, and also
087   * call the coprocessor hook if any.
088   * <p>
089   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
090   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
091   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
092   * update the peer storage.
093   */
094  protected abstract void postPeerModification(MasterProcedureEnv env)
095    throws IOException, ReplicationException;
096
097  private void releaseLatch() {
098    ProcedurePrepareLatch.releaseLatch(latch, this);
099  }
100
101  /**
102   * Implementation class can override this method. By default we will jump to
103   * POST_PEER_MODIFICATION and finish the procedure.
104   */
105  protected PeerModificationState nextStateAfterRefresh() {
106    return PeerModificationState.POST_PEER_MODIFICATION;
107  }
108
109  /**
110   * The implementation class should override this method if the procedure may enter the serial
111   * related states.
112   */
113  protected boolean enablePeerBeforeFinish() {
114    throw new UnsupportedOperationException();
115  }
116
117  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
118    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
119      .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
120  }
121
122  protected ReplicationPeerConfig getOldPeerConfig() {
123    return null;
124  }
125
126  protected ReplicationPeerConfig getNewPeerConfig() {
127    throw new UnsupportedOperationException();
128  }
129
130  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
131    throws IOException, ReplicationException {
132    throw new UnsupportedOperationException();
133  }
134
135  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
136  // regions.
137  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
138    for (;;) {
139      try {
140        TableState state = tsm.getTableState(tn);
141        if (state.isEnabled()) {
142          return true;
143        }
144        if (!state.isEnabling()) {
145          return false;
146        }
147        Thread.sleep(SLEEP_INTERVAL_MS);
148      } catch (TableNotFoundException e) {
149        return false;
150      } catch (InterruptedException e) {
151        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
152      }
153    }
154  }
155
156  // will be override in test to simulate error
157  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
158    ReplicationPeerConfig peerConfig = getNewPeerConfig();
159    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
160    TableStateManager tsm = env.getMasterServices().getTableStateManager();
161    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
162      if (!td.hasGlobalReplicationScope()) {
163        continue;
164      }
165      TableName tn = td.getTableName();
166      if (!peerConfig.needToReplicate(tn)) {
167        continue;
168      }
169      if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) {
170        continue;
171      }
172      if (needReopen(tsm, tn)) {
173        addChildProcedure(new ReopenTableRegionsProcedure(tn));
174      }
175    }
176  }
177
178  // will be override in test to simulate error
179  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
180    env.getReplicationPeerManager().enablePeer(peerId);
181  }
182
183  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
184    ReplicationQueueStorage queueStorage) throws ReplicationException {
185    if (barrier >= 0) {
186      lastSeqIds.put(encodedRegionName, barrier);
187      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
188        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
189        lastSeqIds.clear();
190      }
191    }
192  }
193
194  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
195    ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
196    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
197    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
198      if (!td.hasGlobalReplicationScope()) {
199        continue;
200      }
201      TableName tn = td.getTableName();
202      if (!peerConfig.needToReplicate(tn)) {
203        continue;
204      }
205      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
206    }
207    if (!lastSeqIds.isEmpty()) {
208      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
209    }
210  }
211
212  // If the table is currently disabling, then we need to wait until it is disabled.We will write
213  // replication barrier for a disabled table. And return whether we need to update the last pushed
214  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
215  // then we do not need to update last pushed sequence id for this table.
216  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
217    throws IOException {
218    for (;;) {
219      try {
220        if (!tsm.getTableState(tn).isDisabling()) {
221          return true;
222        }
223        Thread.sleep(SLEEP_INTERVAL_MS);
224      } catch (TableNotFoundException e) {
225        return false;
226      } catch (InterruptedException e) {
227        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
228      }
229    }
230  }
231
232  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
233  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
234  // should not forget to check whether the map is empty at last, if not you should call
235  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
236  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
237    Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
238    TableStateManager tsm = env.getMasterServices().getTableStateManager();
239    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
240    Connection conn = env.getMasterServices().getConnection();
241    if (!needSetLastPushedSequenceId(tsm, tableName)) {
242      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
243      return;
244    }
245    for (Pair<String, Long> name2Barrier : MetaTableAccessor
246      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
247      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
248      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
249        queueStorage);
250    }
251  }
252
253  @Override
254  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
255    setState(ProcedureProtos.ProcedureState.RUNNABLE);
256    env.getProcedureScheduler().addFront(this);
257    return false;
258  }
259
260  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
261    throws ProcedureSuspendedException {
262    if (retryCounter == null) {
263      retryCounter = ProcedureUtil.createRetryCounter(conf);
264    }
265    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
266    backoffConsumer.accept(backoff);
267    setTimeout(Math.toIntExact(backoff));
268    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
269    skipPersistence();
270    throw new ProcedureSuspendedException();
271  }
272
273  @Override
274  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
275    throws ProcedureSuspendedException {
276    switch (state) {
277      case PRE_PEER_MODIFICATION:
278        try {
279          checkPeerModificationEnabled(env);
280          prePeerModification(env);
281        } catch (IOException e) {
282          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
283            + "mark the procedure as failure and give up", getClass().getName(), peerId, e);
284          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
285          releaseLatch();
286          return Flow.NO_MORE_STATE;
287        } catch (ReplicationException e) {
288          throw suspend(env.getMasterConfiguration(),
289            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
290              getClass().getName(), peerId, backoff / 1000, e));
291        }
292        retryCounter = null;
293        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
294        return Flow.HAS_MORE_STATE;
295      case UPDATE_PEER_STORAGE:
296        try {
297          updatePeerStorage(env);
298        } catch (ReplicationException e) {
299          throw suspend(env.getMasterConfiguration(),
300            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
301              getClass().getName(), peerId, backoff / 1000, e));
302        }
303        retryCounter = null;
304        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
305        return Flow.HAS_MORE_STATE;
306      case REFRESH_PEER_ON_RS:
307        refreshPeer(env, getPeerOperationType());
308        setNextState(nextStateAfterRefresh());
309        return Flow.HAS_MORE_STATE;
310      case SERIAL_PEER_REOPEN_REGIONS:
311        try {
312          reopenRegions(env);
313        } catch (Exception e) {
314          throw suspend(env.getMasterConfiguration(),
315            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
316              getClass().getName(), peerId, backoff / 1000, e));
317        }
318        retryCounter = null;
319        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
320        return Flow.HAS_MORE_STATE;
321      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
322        try {
323          updateLastPushedSequenceIdForSerialPeer(env);
324        } catch (Exception e) {
325          throw suspend(env.getMasterConfiguration(),
326            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
327              getClass().getName(), peerId, backoff / 1000, e));
328        }
329        retryCounter = null;
330        setNextState(enablePeerBeforeFinish()
331          ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
332          : PeerModificationState.POST_PEER_MODIFICATION);
333        return Flow.HAS_MORE_STATE;
334      case SERIAL_PEER_SET_PEER_ENABLED:
335        try {
336          enablePeer(env);
337        } catch (ReplicationException e) {
338          throw suspend(env.getMasterConfiguration(),
339            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
340              getClass().getName(), peerId, backoff / 1000, e));
341        }
342        retryCounter = null;
343        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
344        return Flow.HAS_MORE_STATE;
345      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
346        refreshPeer(env, PeerOperationType.ENABLE);
347        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
348        return Flow.HAS_MORE_STATE;
349      case POST_PEER_MODIFICATION:
350        try {
351          postPeerModification(env);
352        } catch (ReplicationException e) {
353          throw suspend(env.getMasterConfiguration(),
354            backoff -> LOG.warn(
355              "{} failed to call postPeerModification for peer {},  sleep {} secs",
356              getClass().getName(), peerId, backoff / 1000, e));
357        } catch (IOException e) {
358          LOG.warn("{} failed to call post CP hook for peer {}, "
359            + "ignore since the procedure has already done", getClass().getName(), peerId, e);
360        }
361        releaseLatch();
362        return Flow.NO_MORE_STATE;
363      default:
364        throw new UnsupportedOperationException("unhandled state=" + state);
365    }
366  }
367
368  @Override
369  protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
370    throws IOException, InterruptedException {
371    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
372      // actually the peer related operations has no rollback, but if we haven't done any
373      // modifications on the peer storage yet, we can just return.
374      return;
375    }
376    throw new UnsupportedOperationException();
377  }
378
379  @Override
380  protected PeerModificationState getState(int stateId) {
381    return PeerModificationState.forNumber(stateId);
382  }
383
384  @Override
385  protected int getStateId(PeerModificationState state) {
386    return state.getNumber();
387  }
388
389  @Override
390  protected PeerModificationState getInitialState() {
391    return PeerModificationState.PRE_PEER_MODIFICATION;
392  }
393}