001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
031import org.apache.hadoop.hbase.backup.BackupInfo;
032import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
033import org.apache.hadoop.hbase.backup.BackupObserver;
034import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
035import org.apache.hadoop.hbase.backup.BackupType;
036import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
037import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
038import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
044import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Handles backup requests, creates backup info records in backup system table to keep track of
052 * backup sessions, dispatches backup request.
053 */
054@InterfaceAudience.Private
055public class BackupManager implements Closeable {
056  // in seconds
057  public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
058    "hbase.backup.exclusive.op.timeout.seconds";
059  // In seconds
060  private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
061  private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
062
063  protected Configuration conf = null;
064  protected BackupInfo backupInfo = null;
065  protected BackupSystemTable systemTable;
066  protected final Connection conn;
067
068  /**
069   * Backup manager constructor.
070   * @param conn connection
071   * @param conf configuration
072   * @throws IOException exception
073   */
074  public BackupManager(Connection conn, Configuration conf) throws IOException {
075    if (
076      !conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
077        BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)
078    ) {
079      throw new BackupException("HBase backup is not enabled. Check your "
080        + BackupRestoreConstants.BACKUP_ENABLE_KEY + " setting.");
081    }
082    this.conf = conf;
083    this.conn = conn;
084    this.systemTable = new BackupSystemTable(conn);
085  }
086
087  /**
088   * Returns backup info
089   */
090  protected BackupInfo getBackupInfo() {
091    return backupInfo;
092  }
093
094  /**
095   * This method modifies the master's configuration in order to inject backup-related features
096   * (TESTs only)
097   * @param conf configuration
098   */
099  public static void decorateMasterConfiguration(Configuration conf) {
100    if (!isBackupEnabled(conf)) {
101      return;
102    }
103    // Add WAL archive cleaner plug-in
104    String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
105    String cleanerClass = BackupLogCleaner.class.getCanonicalName();
106    if (!plugins.contains(cleanerClass)) {
107      conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
108    }
109
110    String classes = conf.get(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY);
111    String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
112    if (classes == null) {
113      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
114    } else if (!classes.contains(masterProcedureClass)) {
115      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
116        classes + "," + masterProcedureClass);
117    }
118
119    plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
120    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
121      (plugins == null ? "" : plugins + ",") + BackupHFileCleaner.class.getName());
122    if (LOG.isDebugEnabled()) {
123      LOG.debug(
124        "Added log cleaner: {}. Added master procedure manager: {}."
125          + "Added master procedure manager: {}",
126        cleanerClass, masterProcedureClass, BackupHFileCleaner.class.getName());
127    }
128  }
129
130  /**
131   * This method modifies the Region Server configuration in order to inject backup-related features
132   * TESTs only.
133   * @param conf configuration
134   */
135  public static void decorateRegionServerConfiguration(Configuration conf) {
136    if (!isBackupEnabled(conf)) {
137      return;
138    }
139
140    String classes = conf.get(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY);
141    String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
142    if (classes == null) {
143      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
144    } else if (!classes.contains(regionProcedureClass)) {
145      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
146        classes + "," + regionProcedureClass);
147    }
148    String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
149    String regionObserverClass = BackupObserver.class.getName();
150    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
151      (coproc == null ? "" : coproc + ",") + regionObserverClass);
152    if (LOG.isDebugEnabled()) {
153      LOG.debug("Added region procedure manager: {}. Added region observer: {}",
154        regionProcedureClass, regionObserverClass);
155    }
156  }
157
158  public static boolean isBackupEnabled(Configuration conf) {
159    return conf.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
160      BackupRestoreConstants.BACKUP_ENABLE_DEFAULT);
161  }
162
163  /**
164   * Get configuration
165   */
166  Configuration getConf() {
167    return conf;
168  }
169
170  /**
171   * Stop all the work of backup.
172   */
173  @Override
174  public void close() {
175    if (systemTable != null) {
176      try {
177        systemTable.close();
178      } catch (Exception e) {
179        LOG.error(e.toString(), e);
180      }
181    }
182  }
183
184  /**
185   * Creates a backup info based on input backup request.
186   * @param backupId      backup id
187   * @param type          type
188   * @param tableList     table list
189   * @param targetRootDir root dir
190   * @param workers       number of parallel workers
191   * @param bandwidth     bandwidth per worker in MB per sec
192   * @throws BackupException exception
193   */
194  public BackupInfo createBackupInfo(String backupId, BackupType type, List<TableName> tableList,
195    String targetRootDir, int workers, long bandwidth, boolean noChecksumVerify)
196    throws BackupException {
197    if (targetRootDir == null) {
198      throw new BackupException("Wrong backup request parameter: target backup root directory");
199    }
200
201    if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
202      // If table list is null for full backup, which means backup all tables. Then fill the table
203      // list with all user tables from meta. It no table available, throw the request exception.
204      List<TableDescriptor> htds = null;
205      try (Admin admin = conn.getAdmin()) {
206        htds = admin.listTableDescriptors();
207      } catch (Exception e) {
208        throw new BackupException(e);
209      }
210
211      if (htds == null) {
212        throw new BackupException("No table exists for full backup of all tables.");
213      } else {
214        tableList = new ArrayList<>();
215        for (TableDescriptor hTableDescriptor : htds) {
216          TableName tn = hTableDescriptor.getTableName();
217          if (tn.equals(BackupSystemTable.getTableName(conf))) {
218            // skip backup system table
219            continue;
220          }
221          tableList.add(hTableDescriptor.getTableName());
222        }
223
224        LOG.info("Full backup all the tables available in the cluster: {}", tableList);
225      }
226    }
227
228    // there are one or more tables in the table list
229    backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
230      targetRootDir);
231    backupInfo.setBandwidth(bandwidth);
232    backupInfo.setWorkers(workers);
233    backupInfo.setNoChecksumVerify(noChecksumVerify);
234    return backupInfo;
235  }
236
237  /**
238   * Check if any ongoing backup. Currently, we only reply on checking status in backup system
239   * table. We need to consider to handle the case of orphan records in the future. Otherwise, all
240   * the coming request will fail.
241   * @return the ongoing backup id if on going backup exists, otherwise null
242   * @throws IOException exception
243   */
244  private String getOngoingBackupId() throws IOException {
245    ArrayList<BackupInfo> sessions = systemTable.getBackupInfos(BackupState.RUNNING);
246    if (sessions.size() == 0) {
247      return null;
248    }
249    return sessions.get(0).getBackupId();
250  }
251
252  /**
253   * Start the backup manager service.
254   * @throws IOException exception
255   */
256  public void initialize() throws IOException {
257    String ongoingBackupId = this.getOngoingBackupId();
258    if (ongoingBackupId != null) {
259      LOG.info("There is a ongoing backup {}"
260        + ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
261      throw new BackupException("There is ongoing backup seesion.");
262    }
263  }
264
265  public void setBackupInfo(BackupInfo backupInfo) {
266    this.backupInfo = backupInfo;
267  }
268
269  /*
270   * backup system table operations
271   */
272
273  /**
274   * Updates status (state) of a backup session in a persistent store
275   * @param context context
276   * @throws IOException exception
277   */
278  public void updateBackupInfo(BackupInfo context) throws IOException {
279    systemTable.updateBackupInfo(context);
280  }
281
282  /**
283   * Starts new backup session
284   * @throws IOException if active session already exists
285   */
286  public void startBackupSession() throws IOException {
287    long startTime = EnvironmentEdgeManager.currentTime();
288    long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
289      DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
290    long lastWarningOutputTime = 0;
291    while (EnvironmentEdgeManager.currentTime() - startTime < timeout) {
292      try {
293        systemTable.startBackupExclusiveOperation();
294        return;
295      } catch (IOException e) {
296        if (e instanceof ExclusiveOperationException) {
297          // sleep, then repeat
298          try {
299            Thread.sleep(1000);
300          } catch (InterruptedException e1) {
301            // Restore the interrupted status
302            Thread.currentThread().interrupt();
303          }
304          if (
305            lastWarningOutputTime == 0
306              || (EnvironmentEdgeManager.currentTime() - lastWarningOutputTime) > 60000
307          ) {
308            lastWarningOutputTime = EnvironmentEdgeManager.currentTime();
309            LOG.warn("Waiting to acquire backup exclusive lock for {}s",
310              +(lastWarningOutputTime - startTime) / 1000);
311          }
312        } else {
313          throw e;
314        }
315      }
316    }
317    throw new IOException(
318      "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
319  }
320
321  /**
322   * Finishes active backup session
323   * @throws IOException if no active session
324   */
325  public void finishBackupSession() throws IOException {
326    systemTable.finishBackupExclusiveOperation();
327  }
328
329  /**
330   * Read the last backup start code (timestamp) of last successful backup. Will return null if
331   * there is no startcode stored in backup system table or the value is of length 0. These two
332   * cases indicate there is no successful backup completed so far.
333   * @return the timestamp of a last successful backup
334   * @throws IOException exception
335   */
336  public String readBackupStartCode() throws IOException {
337    return systemTable.readBackupStartCode(backupInfo.getBackupRootDir());
338  }
339
340  /**
341   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
342   * @param startCode start code
343   * @throws IOException exception
344   */
345  public void writeBackupStartCode(Long startCode) throws IOException {
346    systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir());
347  }
348
349  /**
350   * Get the RS log information after the last log roll from backup system table.
351   * @return RS log info
352   * @throws IOException exception
353   */
354  public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
355    return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
356  }
357
358  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
359    return systemTable.readBulkloadRows(tableList);
360  }
361
362  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
363    systemTable.deleteBulkLoadedRows(rows);
364  }
365
366  /**
367   * Get all completed backup information (in desc order by time)
368   * @return history info of BackupCompleteData
369   * @throws IOException exception
370   */
371  public List<BackupInfo> getBackupHistory() throws IOException {
372    return systemTable.getBackupHistory();
373  }
374
375  public ArrayList<BackupInfo> getBackupHistory(boolean completed) throws IOException {
376    return systemTable.getBackupHistory(completed);
377  }
378
379  /**
380   * Write the current timestamps for each regionserver to backup system table after a successful
381   * full or incremental backup. Each table may have a different set of log timestamps. The saved
382   * timestamp is of the last log file that was backed up already.
383   * @param tables tables
384   * @throws IOException exception
385   */
386  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps)
387    throws IOException {
388    systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
389  }
390
391  /**
392   * Read the timestamp for each region server log after the last successful backup. Each table has
393   * its own set of the timestamps.
394   * @return the timestamp for each region server. key: tableName value:
395   *         RegionServer,PreviousTimeStamp
396   * @throws IOException exception
397   */
398  public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
399    return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
400  }
401
402  /**
403   * Return the current tables covered by incremental backup.
404   * @return set of tableNames
405   * @throws IOException exception
406   */
407  public Set<TableName> getIncrementalBackupTableSet() throws IOException {
408    return systemTable.getIncrementalBackupTableSet(backupInfo.getBackupRootDir());
409  }
410
411  /**
412   * Adds set of tables to overall incremental backup table set
413   * @param tables tables
414   * @throws IOException exception
415   */
416  public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
417    systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
418  }
419
420  public Connection getConnection() {
421    return conn;
422  }
423}