001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP; 021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 022import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER; 023import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER; 024import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER; 025import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE; 026import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; 027import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.function.LongConsumer; 035import java.util.stream.Collectors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; 038import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 039import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 040import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; 041import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 043import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 044import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 045import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 046import org.apache.hadoop.hbase.replication.ReplicationException; 047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 048import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 049import org.apache.hadoop.hbase.util.RetryCounter; 050import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 051import org.apache.hadoop.hbase.util.VersionInfo; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.KeeperException; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 063 064/** 065 * A procedure for migrating replication queue data from zookeeper to hbase:replication table. 066 */ 067@InterfaceAudience.Private 068public class MigrateReplicationQueueFromZkToTableProcedure 069 extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState> 070 implements GlobalProcedureInterface { 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class); 074 075 private static final int MIN_MAJOR_VERSION = 3; 076 077 private List<String> disabledPeerIds; 078 079 private CompletableFuture<Void> future; 080 081 private ExecutorService executor; 082 083 private RetryCounter retryCounter; 084 085 @Override 086 public String getGlobalId() { 087 return getClass().getSimpleName(); 088 } 089 090 private CompletableFuture<Void> getFuture() { 091 return future; 092 } 093 094 private void setFuture(CompletableFuture<Void> f) { 095 future = f; 096 } 097 098 private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) 099 throws ProcedureSuspendedException { 100 if (retryCounter == null) { 101 retryCounter = ProcedureUtil.createRetryCounter(conf); 102 } 103 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 104 backoffConsumer.accept(backoff); 105 throw suspend(Math.toIntExact(backoff), true); 106 } 107 108 private void resetRetry() { 109 retryCounter = null; 110 } 111 112 private ExecutorService getExecutorService() { 113 if (executor == null) { 114 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() 115 .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build()); 116 } 117 return executor; 118 } 119 120 private void shutdownExecutorService() { 121 if (executor != null) { 122 executor.shutdown(); 123 executor = null; 124 } 125 } 126 127 private void disableReplicationLogCleaner(MasterProcedureEnv env) 128 throws ProcedureSuspendedException { 129 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 130 // it is not likely that we can reach here as we will schedule this procedure immediately 131 // after master restarting, where ReplicationLogCleaner should have not started its first run 132 // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since 133 // there will be no data in the new replication queue storage before we execute this procedure 134 // so ReplicationLogCleaner will quit immediately without doing anything. 135 throw suspend(env.getMasterConfiguration(), 136 backoff -> LOG.info( 137 "Can not disable replication log cleaner, sleep {} secs and retry later", 138 backoff / 1000)); 139 } 140 resetRetry(); 141 } 142 143 private void enableReplicationLogCleaner(MasterProcedureEnv env) { 144 env.getMasterServices().getReplicationLogCleanerBarrier().enable(); 145 } 146 147 private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException { 148 long peerProcCount; 149 try { 150 peerProcCount = env.getMasterServices().getProcedures().stream() 151 .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count(); 152 } catch (IOException e) { 153 throw suspend(env.getMasterConfiguration(), 154 backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later", 155 backoff / 1000, e)); 156 } 157 if (peerProcCount > 0) { 158 throw suspend(env.getMasterConfiguration(), 159 backoff -> LOG.info( 160 "There are still {} pending peer procedures, sleep {} secs and retry later", 161 peerProcCount, backoff / 1000)); 162 } 163 resetRetry(); 164 LOG.info("No pending peer procedures found, continue..."); 165 } 166 167 private void finishMigartion() { 168 shutdownExecutorService(); 169 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); 170 resetRetry(); 171 } 172 173 private void cleanup(MasterProcedureEnv env) throws ProcedureSuspendedException { 174 ZKReplicationQueueStorageForMigration oldStorage = new ZKReplicationQueueStorageForMigration( 175 env.getMasterServices().getZooKeeper(), env.getMasterConfiguration()); 176 try { 177 oldStorage.deleteAllData(); 178 env.getReplicationPeerManager().deleteLegacyRegionReplicaReplicationPeer(); 179 } catch (KeeperException | ReplicationException e) { 180 throw suspend(env.getMasterConfiguration(), 181 backoff -> LOG.warn( 182 "failed to delete old replication queue data, sleep {} secs and retry later", 183 backoff / 1000, e)); 184 } 185 } 186 187 @Override 188 protected Flow executeFromState(MasterProcedureEnv env, 189 MigrateReplicationQueueFromZkToTableState state) 190 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 191 switch (state) { 192 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER: 193 disableReplicationLogCleaner(env); 194 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE); 195 return Flow.HAS_MORE_STATE; 196 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE: 197 waitUntilNoPeerProcedure(env); 198 List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null); 199 if (peers.isEmpty()) { 200 // we will not load the region_replica_replication peer, so here we need to check the 201 // storage directly 202 try { 203 if (env.getReplicationPeerManager().hasRegionReplicaReplicationPeer()) { 204 LOG.info( 205 "No active replication peer found but we still have '{}' peer, need to" 206 + "wait until all region servers are upgraded", 207 ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 208 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); 209 return Flow.HAS_MORE_STATE; 210 } 211 } catch (ReplicationException e) { 212 throw suspend(env.getMasterConfiguration(), backoff -> LOG 213 .warn("failed to list peer ids, sleep {} secs and retry later", backoff / 1000, e)); 214 } 215 LOG.info("No active replication peer found, just clean up all replication queue data"); 216 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); 217 return Flow.HAS_MORE_STATE; 218 } 219 disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) 220 .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); 221 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); 222 resetRetry(); 223 return Flow.HAS_MORE_STATE; 224 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER: 225 for (String peerId : disabledPeerIds) { 226 addChildProcedure(new DisablePeerProcedure(peerId)); 227 } 228 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); 229 return Flow.HAS_MORE_STATE; 230 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: 231 try { 232 if ( 233 ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, 234 this::finishMigartion) 235 ) { 236 return Flow.HAS_MORE_STATE; 237 } 238 ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, 239 env.getReplicationPeerManager() 240 .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()), 241 env, this::finishMigartion); 242 } catch (IOException e) { 243 throw suspend(env.getMasterConfiguration(), 244 backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later", 245 backoff / 1000, e)); 246 } 247 return Flow.HAS_MORE_STATE; 248 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: 249 long rsWithLowerVersion = 250 env.getMasterServices().getServerManager().getOnlineServers().values().stream() 251 .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count(); 252 if (rsWithLowerVersion == 0) { 253 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER); 254 return Flow.HAS_MORE_STATE; 255 } else { 256 throw suspend(env.getMasterConfiguration(), 257 backoff -> LOG.warn( 258 "There are still {} region servers which have a major version" 259 + " less than {}, sleep {} secs and check later", 260 rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000)); 261 } 262 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: 263 if (CollectionUtils.isNotEmpty(disabledPeerIds)) { 264 for (String peerId : disabledPeerIds) { 265 addChildProcedure(new EnablePeerProcedure(peerId)); 266 } 267 } 268 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); 269 return Flow.HAS_MORE_STATE; 270 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER: 271 enableReplicationLogCleaner(env); 272 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP); 273 return Flow.HAS_MORE_STATE; 274 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP: 275 // this is mainly for deleting the region replica replication queue data, but anyway, since 276 // we should have migrated all data, here we can simply delete everything 277 cleanup(env); 278 return Flow.NO_MORE_STATE; 279 default: 280 throw new UnsupportedOperationException("unhandled state=" + state); 281 } 282 } 283 284 @Override 285 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 286 setState(ProcedureProtos.ProcedureState.RUNNABLE); 287 env.getProcedureScheduler().addFront(this); 288 return false; 289 } 290 291 @Override 292 protected void rollbackState(MasterProcedureEnv env, 293 MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException { 294 throw new UnsupportedOperationException(); 295 } 296 297 @Override 298 protected MigrateReplicationQueueFromZkToTableState getState(int stateId) { 299 return MigrateReplicationQueueFromZkToTableState.forNumber(stateId); 300 } 301 302 @Override 303 protected int getStateId(MigrateReplicationQueueFromZkToTableState state) { 304 return state.getNumber(); 305 } 306 307 @Override 308 protected MigrateReplicationQueueFromZkToTableState getInitialState() { 309 return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 310 } 311 312 @Override 313 protected void afterReplay(MasterProcedureEnv env) { 314 if (getCurrentState() == getInitialState()) { 315 // do not need to disable log cleaner or acquire lock if we are in the initial state, later 316 // when executing the procedure we will try to disable and acquire. 317 return; 318 } 319 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 320 throw new IllegalStateException("can not disable log cleaner, this should not happen"); 321 } 322 } 323 324 @Override 325 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 326 super.serializeStateData(serializer); 327 MigrateReplicationQueueFromZkToTableStateData.Builder builder = 328 MigrateReplicationQueueFromZkToTableStateData.newBuilder(); 329 if (CollectionUtils.isNotEmpty(disabledPeerIds)) { 330 builder.addAllDisabledPeerId(disabledPeerIds); 331 } 332 serializer.serialize(builder.build()); 333 } 334 335 @Override 336 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 337 super.deserializeStateData(serializer); 338 MigrateReplicationQueueFromZkToTableStateData data = 339 serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class); 340 if (data.getDisabledPeerIdCount() > 0) { 341 disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList()); 342 } 343 } 344}