001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.master; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 027import org.apache.hadoop.hbase.backup.impl.BackupManager; 028import org.apache.hadoop.hbase.errorhandling.ForeignException; 029import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 030import org.apache.hadoop.hbase.master.MasterServices; 031import org.apache.hadoop.hbase.master.MetricsMaster; 032import org.apache.hadoop.hbase.procedure.MasterProcedureManager; 033import org.apache.hadoop.hbase.procedure.Procedure; 034import org.apache.hadoop.hbase.procedure.ProcedureCoordinationManager; 035import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; 036import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; 037import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 038import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinationManager; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.security.access.AccessChecker; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 048 049/** 050 * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during 051 * backup operation, see {@link MasterProcedureManager} and {@link RegionServerProcedureManager} 052 */ 053@InterfaceAudience.Private 054public class LogRollMasterProcedureManager extends MasterProcedureManager { 055 private static final Logger LOG = LoggerFactory.getLogger(LogRollMasterProcedureManager.class); 056 057 public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; 058 public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; 059 public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.logroll.wake.millis"; 060 public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.logroll.timeout.millis"; 061 public static final String BACKUP_POOL_THREAD_NUMBER_KEY = 062 "hbase.backup.logroll.pool.thread.number"; 063 064 public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500; 065 public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 180000; 066 public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8; 067 private MasterServices master; 068 private ProcedureCoordinator coordinator; 069 private boolean done; 070 071 @Override 072 public void stop(String why) { 073 LOG.info("stop: " + why); 074 } 075 076 @Override 077 public boolean isStopped() { 078 return false; 079 } 080 081 @Override 082 public void initialize(MasterServices master, MetricsMaster metricsMaster) 083 throws IOException, UnsupportedOperationException { 084 this.master = master; 085 this.done = false; 086 087 // setup the default procedure coordinator 088 String name = master.getServerName().toString(); 089 090 // get the configuration for the coordinator 091 Configuration conf = master.getConfiguration(); 092 long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT); 093 long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); 094 int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY, BACKUP_POOL_THREAD_NUMBER_DEFAULT); 095 096 // setup the default procedure coordinator 097 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); 098 ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(master); 099 ProcedureCoordinatorRpcs comms = 100 coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); 101 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); 102 103 } 104 105 @Override 106 public String getProcedureSignature() { 107 return ROLLLOG_PROCEDURE_SIGNATURE; 108 } 109 110 @Override 111 public void execProcedure(ProcedureDescription desc) throws IOException { 112 if (!isBackupEnabled()) { 113 LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY 114 + " setting"); 115 return; 116 } 117 this.done = false; 118 // start the process on the RS 119 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); 120 List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); 121 List<String> servers = new ArrayList<>(); 122 for (ServerName sn : serverNames) { 123 servers.add(sn.toString()); 124 } 125 126 List<NameStringPair> conf = desc.getConfigurationList(); 127 byte[] data = new byte[0]; 128 if (conf.size() > 0) { 129 // Get backup root path 130 data = Bytes.toBytes(conf.get(0).getValue()); 131 } 132 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers); 133 if (proc == null) { 134 String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; 135 LOG.error(msg); 136 throw new IOException(msg); 137 } 138 139 try { 140 // wait for the procedure to complete. A timer thread is kicked off that should cancel this 141 // if it takes too long. 142 proc.waitForCompleted(); 143 LOG.info("Done waiting - exec procedure for " + desc.getInstance()); 144 LOG.info("Distributed roll log procedure is successful!"); 145 this.done = true; 146 } catch (InterruptedException e) { 147 ForeignException ee = 148 new ForeignException("Interrupted while waiting for roll log procdure to finish", e); 149 monitor.receive(ee); 150 Thread.currentThread().interrupt(); 151 } catch (ForeignException e) { 152 ForeignException ee = 153 new ForeignException("Exception while waiting for roll log procdure to finish", e); 154 monitor.receive(ee); 155 } 156 monitor.rethrowException(); 157 } 158 159 @Override 160 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 161 throws IOException { 162 // TODO: what permissions checks are needed here? 163 } 164 165 private boolean isBackupEnabled() { 166 return BackupManager.isBackupEnabled(master.getConfiguration()); 167 } 168 169 @Override 170 public boolean isProcedureDone(ProcedureDescription desc) { 171 return done; 172 } 173}