001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.function.LongConsumer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.MetaTableAccessor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 037import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.RetryCounter; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 049 050/** 051 * The base class for all replication peer related procedure except sync replication state 052 * transition. 053 */ 054@InterfaceAudience.Private 055public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> { 056 057 private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); 058 059 protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; 060 061 // The sleep interval when waiting table to be enabled or disabled. 062 protected static final int SLEEP_INTERVAL_MS = 1000; 063 064 private RetryCounter retryCounter; 065 066 protected ModifyPeerProcedure() { 067 } 068 069 protected ModifyPeerProcedure(String peerId) { 070 super(peerId); 071 } 072 073 /** 074 * Called before we start the actual processing. The implementation should call the pre CP hook, 075 * and also the pre-check for the peer modification. 076 * <p> 077 * If an IOException is thrown then we will give up and mark the procedure as failed directly. If 078 * all checks passes then the procedure can not be rolled back any more. 079 */ 080 protected abstract void prePeerModification(MasterProcedureEnv env) 081 throws IOException, ReplicationException; 082 083 protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; 084 085 /** 086 * Called before we finish the procedure. The implementation can do some logging work, and also 087 * call the coprocessor hook if any. 088 * <p> 089 * Notice that, since we have already done the actual work, throwing {@code IOException} here will 090 * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If 091 * {@code ReplicationException} is thrown we will retry since this usually means we fails to 092 * update the peer storage. 093 */ 094 protected abstract void postPeerModification(MasterProcedureEnv env) 095 throws IOException, ReplicationException; 096 097 private void releaseLatch() { 098 ProcedurePrepareLatch.releaseLatch(latch, this); 099 } 100 101 /** 102 * Implementation class can override this method. By default we will jump to 103 * POST_PEER_MODIFICATION and finish the procedure. 104 */ 105 protected PeerModificationState nextStateAfterRefresh() { 106 return PeerModificationState.POST_PEER_MODIFICATION; 107 } 108 109 /** 110 * The implementation class should override this method if the procedure may enter the serial 111 * related states. 112 */ 113 protected boolean enablePeerBeforeFinish() { 114 throw new UnsupportedOperationException(); 115 } 116 117 private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { 118 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 119 .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); 120 } 121 122 protected ReplicationPeerConfig getOldPeerConfig() { 123 return null; 124 } 125 126 protected ReplicationPeerConfig getNewPeerConfig() { 127 throw new UnsupportedOperationException(); 128 } 129 130 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 131 throws IOException, ReplicationException { 132 throw new UnsupportedOperationException(); 133 } 134 135 // If the table is in enabling state, we need to wait until it is enabled and then reopen all its 136 // regions. 137 private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException { 138 for (;;) { 139 try { 140 TableState state = tsm.getTableState(tn); 141 if (state.isEnabled()) { 142 return true; 143 } 144 if (!state.isEnabling()) { 145 return false; 146 } 147 Thread.sleep(SLEEP_INTERVAL_MS); 148 } catch (TableNotFoundException e) { 149 return false; 150 } catch (InterruptedException e) { 151 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 152 } 153 } 154 } 155 156 // will be override in test to simulate error 157 protected void reopenRegions(MasterProcedureEnv env) throws IOException { 158 ReplicationPeerConfig peerConfig = getNewPeerConfig(); 159 ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); 160 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 161 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 162 if (!td.hasGlobalReplicationScope()) { 163 continue; 164 } 165 TableName tn = td.getTableName(); 166 if (!peerConfig.needToReplicate(tn)) { 167 continue; 168 } 169 if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) { 170 continue; 171 } 172 if (needReopen(tsm, tn)) { 173 addChildProcedure(new ReopenTableRegionsProcedure(tn)); 174 } 175 } 176 } 177 178 // will be override in test to simulate error 179 protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { 180 env.getReplicationPeerManager().enablePeer(peerId); 181 } 182 183 private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, 184 ReplicationQueueStorage queueStorage) throws ReplicationException { 185 if (barrier >= 0) { 186 lastSeqIds.put(encodedRegionName, barrier); 187 if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { 188 queueStorage.setLastSequenceIds(peerId, lastSeqIds); 189 lastSeqIds.clear(); 190 } 191 } 192 } 193 194 protected final void setLastPushedSequenceId(MasterProcedureEnv env, 195 ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { 196 Map<String, Long> lastSeqIds = new HashMap<String, Long>(); 197 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 198 if (!td.hasGlobalReplicationScope()) { 199 continue; 200 } 201 TableName tn = td.getTableName(); 202 if (!peerConfig.needToReplicate(tn)) { 203 continue; 204 } 205 setLastPushedSequenceIdForTable(env, tn, lastSeqIds); 206 } 207 if (!lastSeqIds.isEmpty()) { 208 env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); 209 } 210 } 211 212 // If the table is currently disabling, then we need to wait until it is disabled.We will write 213 // replication barrier for a disabled table. And return whether we need to update the last pushed 214 // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, 215 // then we do not need to update last pushed sequence id for this table. 216 private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) 217 throws IOException { 218 for (;;) { 219 try { 220 if (!tsm.getTableState(tn).isDisabling()) { 221 return true; 222 } 223 Thread.sleep(SLEEP_INTERVAL_MS); 224 } catch (TableNotFoundException e) { 225 return false; 226 } catch (InterruptedException e) { 227 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 228 } 229 } 230 } 231 232 // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is 233 // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller 234 // should not forget to check whether the map is empty at last, if not you should call 235 // queueStorage.setLastSequenceIds to write out the remaining entries in the map. 236 protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, 237 Map<String, Long> lastSeqIds) throws IOException, ReplicationException { 238 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 239 ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); 240 Connection conn = env.getMasterServices().getConnection(); 241 if (!needSetLastPushedSequenceId(tsm, tableName)) { 242 LOG.debug("Skip settting last pushed sequence id for {}", tableName); 243 return; 244 } 245 for (Pair<String, Long> name2Barrier : MetaTableAccessor 246 .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { 247 LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); 248 addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, 249 queueStorage); 250 } 251 } 252 253 @Override 254 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 255 setState(ProcedureProtos.ProcedureState.RUNNABLE); 256 env.getProcedureScheduler().addFront(this); 257 return false; 258 } 259 260 private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) 261 throws ProcedureSuspendedException { 262 if (retryCounter == null) { 263 retryCounter = ProcedureUtil.createRetryCounter(conf); 264 } 265 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 266 backoffConsumer.accept(backoff); 267 setTimeout(Math.toIntExact(backoff)); 268 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 269 skipPersistence(); 270 throw new ProcedureSuspendedException(); 271 } 272 273 @Override 274 protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) 275 throws ProcedureSuspendedException { 276 switch (state) { 277 case PRE_PEER_MODIFICATION: 278 try { 279 checkPeerModificationEnabled(env); 280 prePeerModification(env); 281 } catch (IOException e) { 282 LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " 283 + "mark the procedure as failure and give up", getClass().getName(), peerId, e); 284 setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); 285 releaseLatch(); 286 return Flow.NO_MORE_STATE; 287 } catch (ReplicationException e) { 288 throw suspend(env.getMasterConfiguration(), 289 backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", 290 getClass().getName(), peerId, backoff / 1000, e)); 291 } 292 retryCounter = null; 293 setNextState(PeerModificationState.UPDATE_PEER_STORAGE); 294 return Flow.HAS_MORE_STATE; 295 case UPDATE_PEER_STORAGE: 296 try { 297 updatePeerStorage(env); 298 } catch (ReplicationException e) { 299 throw suspend(env.getMasterConfiguration(), 300 backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", 301 getClass().getName(), peerId, backoff / 1000, e)); 302 } 303 retryCounter = null; 304 setNextState(PeerModificationState.REFRESH_PEER_ON_RS); 305 return Flow.HAS_MORE_STATE; 306 case REFRESH_PEER_ON_RS: 307 refreshPeer(env, getPeerOperationType()); 308 setNextState(nextStateAfterRefresh()); 309 return Flow.HAS_MORE_STATE; 310 case SERIAL_PEER_REOPEN_REGIONS: 311 try { 312 reopenRegions(env); 313 } catch (Exception e) { 314 throw suspend(env.getMasterConfiguration(), 315 backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", 316 getClass().getName(), peerId, backoff / 1000, e)); 317 } 318 retryCounter = null; 319 setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); 320 return Flow.HAS_MORE_STATE; 321 case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: 322 try { 323 updateLastPushedSequenceIdForSerialPeer(env); 324 } catch (Exception e) { 325 throw suspend(env.getMasterConfiguration(), 326 backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", 327 getClass().getName(), peerId, backoff / 1000, e)); 328 } 329 retryCounter = null; 330 setNextState(enablePeerBeforeFinish() 331 ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED 332 : PeerModificationState.POST_PEER_MODIFICATION); 333 return Flow.HAS_MORE_STATE; 334 case SERIAL_PEER_SET_PEER_ENABLED: 335 try { 336 enablePeer(env); 337 } catch (ReplicationException e) { 338 throw suspend(env.getMasterConfiguration(), 339 backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", 340 getClass().getName(), peerId, backoff / 1000, e)); 341 } 342 retryCounter = null; 343 setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); 344 return Flow.HAS_MORE_STATE; 345 case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: 346 refreshPeer(env, PeerOperationType.ENABLE); 347 setNextState(PeerModificationState.POST_PEER_MODIFICATION); 348 return Flow.HAS_MORE_STATE; 349 case POST_PEER_MODIFICATION: 350 try { 351 postPeerModification(env); 352 } catch (ReplicationException e) { 353 throw suspend(env.getMasterConfiguration(), 354 backoff -> LOG.warn( 355 "{} failed to call postPeerModification for peer {}, sleep {} secs", 356 getClass().getName(), peerId, backoff / 1000, e)); 357 } catch (IOException e) { 358 LOG.warn("{} failed to call post CP hook for peer {}, " 359 + "ignore since the procedure has already done", getClass().getName(), peerId, e); 360 } 361 releaseLatch(); 362 return Flow.NO_MORE_STATE; 363 default: 364 throw new UnsupportedOperationException("unhandled state=" + state); 365 } 366 } 367 368 @Override 369 protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) 370 throws IOException, InterruptedException { 371 if (state == PeerModificationState.PRE_PEER_MODIFICATION) { 372 // actually the peer related operations has no rollback, but if we haven't done any 373 // modifications on the peer storage yet, we can just return. 374 return; 375 } 376 throw new UnsupportedOperationException(); 377 } 378 379 @Override 380 protected PeerModificationState getState(int stateId) { 381 return PeerModificationState.forNumber(stateId); 382 } 383 384 @Override 385 protected int getStateId(PeerModificationState state) { 386 return state.getNumber(); 387 } 388 389 @Override 390 protected PeerModificationState getInitialState() { 391 return PeerModificationState.PRE_PEER_MODIFICATION; 392 } 393}