001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ScheduledFuture;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.locks.ReadWriteLock;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.AclEntry;
046import org.apache.hadoop.fs.permission.AclStatus;
047import org.apache.hadoop.hbase.HBaseInterfaceAudience;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.Stoppable;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableState;
055import org.apache.hadoop.hbase.errorhandling.ForeignException;
056import org.apache.hadoop.hbase.executor.ExecutorService;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
059import org.apache.hadoop.hbase.master.MasterFileSystem;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsMaster;
062import org.apache.hadoop.hbase.master.SnapshotSentinel;
063import org.apache.hadoop.hbase.master.WorkerAssigner;
064import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
065import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
066import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
067import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
070import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
071import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
072import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
073import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
074import org.apache.hadoop.hbase.procedure.Procedure;
075import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
076import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
077import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
078import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
079import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
080import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
081import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
082import org.apache.hadoop.hbase.security.AccessDeniedException;
083import org.apache.hadoop.hbase.security.User;
084import org.apache.hadoop.hbase.security.access.AccessChecker;
085import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
086import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
087import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
088import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
089import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
090import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
091import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
092import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
093import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
094import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
095import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
096import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
097import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
098import org.apache.hadoop.hbase.util.CommonFSUtils;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.NonceKey;
101import org.apache.hadoop.hbase.util.TableDescriptorChecker;
102import org.apache.yetus.audience.InterfaceAudience;
103import org.apache.yetus.audience.InterfaceStability;
104import org.apache.zookeeper.KeeperException;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107
108import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
109
110import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
115
116/**
117 * This class manages the procedure of taking and restoring snapshots. There is only one
118 * SnapshotManager for the master.
119 * <p>
120 * The class provides methods for monitoring in-progress snapshot actions.
121 * <p>
122 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
123 * simplification in the current implementation.
124 */
125@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
126@InterfaceStability.Unstable
127public class SnapshotManager extends MasterProcedureManager implements Stoppable {
128  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
129
130  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
131  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
132
133  /**
134   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
135   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
136   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
137   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
138   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
139   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
140   * done if exists or failed if it doesn't exists.
141   */
142  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
143    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
144  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
145
146  /** Enable or disable snapshot support */
147  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
148
149  /**
150   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
151   */
152  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
153
154  /** Name of the operation to use in the controller */
155  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
156
157  /** Conf key for # of threads used by the SnapshotManager thread pool */
158  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
159
160  /** number of current operations running on the master */
161  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
162
163  /** Conf key for preserving original max file size configs */
164  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
165    "hbase.snapshot.max.filesize.preserve";
166
167  /** Enable or disable snapshot procedure */
168  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
169
170  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
171
172  private boolean stopped;
173  private MasterServices master; // Needed by TableEventHandlers
174  private ProcedureCoordinator coordinator;
175
176  // Is snapshot feature enabled?
177  private boolean isSnapshotSupported = false;
178
179  // Snapshot handlers map, with table name as key.
180  // The map is always accessed and modified under the object lock using synchronized.
181  // snapshotTable() will insert an Handler in the table.
182  // isSnapshotDone() will remove the handler requested if the operation is finished.
183  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
184  private final ScheduledExecutorService scheduleThreadPool =
185    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
186      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
187  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
188
189  // Restore map, with table name as key, procedure ID as value.
190  // The map is always accessed and modified under the object lock using synchronized.
191  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
192  //
193  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
194  // restart/failover. This is just a stopgap implementation until implementation of taking
195  // snapshot using Procedure-V2.
196  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
197
198  // SnapshotDescription -> SnapshotProcId
199  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
200    new ConcurrentHashMap<>();
201
202  private WorkerAssigner verifyWorkerAssigner;
203
204  private Path rootDir;
205  private ExecutorService executorService;
206
207  /**
208   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
209   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
210   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
211   * start to work. (See HBASE-21387)
212   */
213  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
214
215  public SnapshotManager() {
216  }
217
218  /**
219   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
220   * @param master      services for the master where the manager is running
221   * @param coordinator procedure coordinator instance. exposed for testing.
222   * @param pool        HBase ExecutorServcie instance, exposed for testing.
223   */
224  @InterfaceAudience.Private
225  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
226    ExecutorService pool, int sentinelCleanInterval)
227    throws IOException, UnsupportedOperationException {
228    this.master = master;
229
230    this.rootDir = master.getMasterFileSystem().getRootDir();
231    Configuration conf = master.getConfiguration();
232    checkSnapshotSupport(conf, master.getMasterFileSystem());
233
234    this.coordinator = coordinator;
235    this.executorService = pool;
236    resetTempDir();
237    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
238      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
239  }
240
241  /**
242   * Gets the list of all completed snapshots.
243   * @return list of SnapshotDescriptions
244   * @throws IOException File system exception
245   */
246  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
247    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
248  }
249
250  /**
251   * Gets the list of all completed snapshots.
252   * @param snapshotDir snapshot directory
253   * @param withCpCall  Whether to call CP hooks
254   * @return list of SnapshotDescriptions
255   * @throws IOException File system exception
256   */
257  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
258    throws IOException {
259    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
260    // first create the snapshot root path and check to see if it exists
261    FileSystem fs = master.getMasterFileSystem().getFileSystem();
262    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
263
264    // if there are no snapshots, return an empty list
265    if (!fs.exists(snapshotDir)) {
266      return snapshotDescs;
267    }
268
269    // ignore all the snapshots in progress
270    FileStatus[] snapshots = fs.listStatus(snapshotDir,
271      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
272    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
273    withCpCall = withCpCall && cpHost != null;
274    // loop through all the completed snapshots
275    for (FileStatus snapshot : snapshots) {
276      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
277      // if the snapshot is bad
278      if (!fs.exists(info)) {
279        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
280        continue;
281      }
282      FSDataInputStream in = null;
283      try {
284        in = fs.open(info);
285        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
286        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
287          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
288        if (withCpCall) {
289          try {
290            cpHost.preListSnapshot(descPOJO);
291          } catch (AccessDeniedException e) {
292            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
293              + "Either you should be owner of this snapshot or admin user.");
294            // Skip this and try for next snapshot
295            continue;
296          }
297        }
298        snapshotDescs.add(desc);
299
300        // call coproc post hook
301        if (withCpCall) {
302          cpHost.postListSnapshot(descPOJO);
303        }
304      } catch (IOException e) {
305        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
306      } finally {
307        if (in != null) {
308          in.close();
309        }
310      }
311    }
312    return snapshotDescs;
313  }
314
315  /**
316   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
317   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
318   * directory.
319   * @throws IOException if we can't reach the filesystem
320   */
321  private void resetTempDir() throws IOException {
322    Set<String> workingProcedureCoordinatedSnapshotNames =
323      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
324
325    Path tmpdir =
326      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
327    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
328    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
329    if (workingSnapshotDirs == null) {
330      return;
331    }
332    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
333      String workingSnapshotName = workingSnapshotDir.getPath().getName();
334      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
335        try {
336          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
337            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
338              workingSnapshotDir.getPath());
339          } else {
340            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
341              workingSnapshotDir.getPath());
342          }
343        } catch (IOException e) {
344          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
345            workingSnapshotDir.getPath(), e);
346        }
347      } else {
348        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
349      }
350    }
351  }
352
353  /**
354   * Delete the specified snapshot
355   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
356   * @throws IOException                   For filesystem IOExceptions
357   */
358  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
359    // check to see if it is completed
360    if (!isSnapshotCompleted(snapshot)) {
361      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
362    }
363
364    String snapshotName = snapshot.getName();
365    // first create the snapshot description and check to see if it exists
366    FileSystem fs = master.getMasterFileSystem().getFileSystem();
367    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
368    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
369    // just the "name" and it does not contains the "real" snapshot information
370    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
371
372    // call coproc pre hook
373    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
374    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
375    if (cpHost != null) {
376      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
377      cpHost.preDeleteSnapshot(snapshotPOJO);
378    }
379
380    LOG.debug("Deleting snapshot: " + snapshotName);
381    // delete the existing snapshot
382    if (!fs.delete(snapshotDir, true)) {
383      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
384    }
385
386    // call coproc post hook
387    if (cpHost != null) {
388      cpHost.postDeleteSnapshot(snapshotPOJO);
389    }
390
391  }
392
393  /**
394   * Check if the specified snapshot is done
395   * @return true if snapshot is ready to be restored, false if it is still being taken.
396   * @throws IOException              IOException if error from HDFS or RPC
397   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
398   */
399  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
400    // check the request to make sure it has a snapshot
401    if (expected == null) {
402      throw new UnknownSnapshotException(
403        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
404    }
405
406    Long procId = snapshotToProcIdMap.get(expected);
407    if (procId != null) {
408      if (master.getMasterProcedureExecutor().isRunning()) {
409        return master.getMasterProcedureExecutor().isFinished(procId);
410      } else {
411        return false;
412      }
413    }
414
415    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
416
417    // check to see if the sentinel exists,
418    // and if the task is complete removes it from the in-progress snapshots map.
419    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
420
421    // stop tracking "abandoned" handlers
422    cleanupSentinels();
423
424    if (handler == null) {
425      // If there's no handler in the in-progress map, it means one of the following:
426      // - someone has already requested the snapshot state
427      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
428      // - the snapshot was never requested
429      // In those cases returns to the user the "done state" if the snapshots exists on disk,
430      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
431      if (!isSnapshotCompleted(expected)) {
432        throw new UnknownSnapshotException("Snapshot " + ssString
433          + " is not currently running or one of the known completed snapshots.");
434      }
435      // was done, return true;
436      return true;
437    }
438
439    // pass on any failure we find in the sentinel
440    try {
441      handler.rethrowExceptionIfFailed();
442    } catch (ForeignException e) {
443      // Give some procedure info on an exception.
444      String status;
445      Procedure p = coordinator.getProcedure(expected.getName());
446      if (p != null) {
447        status = p.getStatus();
448      } else {
449        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
450      }
451      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
452        ProtobufUtil.createSnapshotDesc(expected));
453    }
454
455    // check to see if we are done
456    if (handler.isFinished()) {
457      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
458      return true;
459    } else if (LOG.isDebugEnabled()) {
460      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
461    }
462    return false;
463  }
464
465  /**
466   * Check to see if there is a snapshot in progress with the same name or on the same table.
467   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
468   * don't allow snapshot with the same name.
469   * @param snapshot   description of the snapshot being checked.
470   * @param checkTable check if the table is already taking a snapshot.
471   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
472   *         table.
473   */
474  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
475    if (checkTable) {
476      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
477      if (isTakingSnapshot(snapshotTable)) {
478        return true;
479      }
480    }
481    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
482    while (it.hasNext()) {
483      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
484      SnapshotSentinel sentinel = entry.getValue();
485      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
486        return true;
487      }
488    }
489    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
490    while (spIt.hasNext()) {
491      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
492      if (
493        snapshot.getName().equals(entry.getKey().getName())
494          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
495      ) {
496        return true;
497      }
498    }
499    return false;
500  }
501
502  /**
503   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
504   * only allowing a single snapshot per table at a time.
505   * @param tableName name of the table being snapshotted.
506   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
507   */
508  public boolean isTakingSnapshot(final TableName tableName) {
509    return isTakingSnapshot(tableName, false);
510  }
511
512  public boolean isTableTakingAnySnapshot(final TableName tableName) {
513    return isTakingSnapshot(tableName, true);
514  }
515
516  /**
517   * Check to see if the specified table has a snapshot in progress. Since we introduce the
518   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
519   * can just consider tables in snapshotHandlers only, but for
520   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
521   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
522   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
523   * if table in snapshot.
524   * @param tableName      name of the table being snapshotted.
525   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
526   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
527   */
528  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
529    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
530    if (handler != null && !handler.isFinished()) {
531      return true;
532    }
533    if (checkProcedure) {
534      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
535        if (
536          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
537            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
538        ) {
539          return true;
540        }
541      }
542    }
543    return false;
544  }
545
546  /**
547   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
548   * aren't already running a snapshot or restore on the requested table.
549   * @param snapshot description of the snapshot we want to start
550   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
551   */
552  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
553    throws HBaseSnapshotException {
554    Path workingDir =
555      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
556
557    try {
558      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
559      // delete the working directory, since we aren't running the snapshot. Likely leftovers
560      // from a failed attempt.
561      workingDirFS.delete(workingDir, true);
562
563      // recreate the working directory for the snapshot
564      if (!workingDirFS.mkdirs(workingDir)) {
565        throw new SnapshotCreationException(
566          "Couldn't create working directory (" + workingDir + ") for snapshot",
567          ProtobufUtil.createSnapshotDesc(snapshot));
568      }
569      updateWorkingDirAclsIfRequired(workingDir, workingDirFS);
570    } catch (HBaseSnapshotException e) {
571      throw e;
572    } catch (IOException e) {
573      throw new SnapshotCreationException(
574        "Exception while checking to see if snapshot could be started.", e,
575        ProtobufUtil.createSnapshotDesc(snapshot));
576    }
577  }
578
579  /**
580   * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs,
581   * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that
582   * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of
583   * the working dir to new snapshot dir, the ACLs are retained.
584   * @param workingDir   working dir to build the snapshot.
585   * @param workingDirFS working dir file system.
586   * @throws IOException If ACL read/modify operation fails.
587   */
588  private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS)
589    throws IOException {
590    if (workingDir.getParent() == null || workingDir.getParent().getParent() == null) {
591      return;
592    }
593    AclStatus snapshotWorkingParentDirStatus;
594    try {
595      snapshotWorkingParentDirStatus =
596        workingDirFS.getAclStatus(workingDir.getParent().getParent());
597    } catch (IOException | UnsupportedOperationException e) {
598      LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}",
599        workingDir.getParent().getParent(), workingDir, e);
600      return;
601    }
602    List<AclEntry> snapshotWorkingParentDirAclStatusEntries =
603      snapshotWorkingParentDirStatus.getEntries();
604    if (
605      snapshotWorkingParentDirAclStatusEntries != null
606        && snapshotWorkingParentDirAclStatusEntries.size() > 0
607    ) {
608      workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries);
609    }
610  }
611
612  /**
613   * Take a snapshot of a disabled table.
614   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
615   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
616   *                     directory could not be determined
617   */
618  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
619    // setup the snapshot
620    prepareWorkingDirectory(snapshot);
621
622    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
623    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
624
625    // Take the snapshot of the disabled table
626    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
627    snapshotTable(snapshot, handler);
628  }
629
630  /**
631   * Take a snapshot of an enabled table.
632   * @param snapshot description of the snapshot to take.
633   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
634   *                     directory could not be determined
635   */
636  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
637    // setup the snapshot
638    prepareWorkingDirectory(snapshot);
639
640    // Take the snapshot of the enabled table
641    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
642    snapshotTable(snapshot, handler);
643  }
644
645  /**
646   * Take a snapshot using the specified handler. On failure the snapshot temporary working
647   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
648   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
649   * @param snapshot the snapshot description
650   * @param handler  the snapshot handler
651   */
652  private synchronized void snapshotTable(SnapshotDescription snapshot,
653    final TakeSnapshotHandler handler) throws IOException {
654    try {
655      handler.prepare();
656      this.executorService.submit(handler);
657      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
658    } catch (Exception e) {
659      // cleanup the working directory by trying to delete it from the fs.
660      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
661        master.getConfiguration());
662      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
663      try {
664        if (!workingDirFs.delete(workingDir, true)) {
665          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
666            + ClientSnapshotDescriptionUtils.toString(snapshot));
667        }
668      } catch (IOException e1) {
669        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
670          + ClientSnapshotDescriptionUtils.toString(snapshot));
671      }
672      // fail the snapshot
673      throw new SnapshotCreationException("Could not build snapshot handler", e,
674        ProtobufUtil.createSnapshotDesc(snapshot));
675    }
676  }
677
678  public ReadWriteLock getTakingSnapshotLock() {
679    return this.takingSnapshotLock;
680  }
681
682  /**
683   * The snapshot operation processing as following: <br>
684   * 1. Create a Snapshot Handler, and do some initialization; <br>
685   * 2. Put the handler into snapshotHandlers <br>
686   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
687   * and snapshotHandlers;
688   * @return true to indicate that there're some running snapshots.
689   */
690  public synchronized boolean isTakingAnySnapshot() {
691    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
692      || this.snapshotToProcIdMap.size() > 0;
693  }
694
695  /**
696   * Take a snapshot based on the enabled/disabled state of the table.
697   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
698   * @throws IOException            when some sort of generic IO exception occurs.
699   */
700  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
701    this.takingSnapshotLock.readLock().lock();
702    try {
703      takeSnapshotInternal(snapshot);
704    } finally {
705      this.takingSnapshotLock.readLock().unlock();
706    }
707  }
708
709  public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
710    throws IOException {
711    this.takingSnapshotLock.readLock().lock();
712    try {
713      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
714    } finally {
715      this.takingSnapshotLock.readLock().unlock();
716    }
717  }
718
719  private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup,
720    long nonce) throws IOException {
721    return MasterProcedureUtil
722      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
723        @Override
724        protected void run() throws IOException {
725          sanityCheckBeforeSnapshot(snapshot, false);
726
727          long procId = submitProcedure(new SnapshotProcedure(
728            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
729
730          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
731        }
732
733        @Override
734        protected String getDescription() {
735          return "SnapshotProcedure";
736        }
737      });
738  }
739
740  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
741    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
742
743    // call pre coproc hook
744    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
745    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
746    if (cpHost != null) {
747      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
748      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
749    }
750
751    // if the table is enabled, then have the RS run actually the snapshot work
752    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
753    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
754      if (LOG.isDebugEnabled()) {
755        LOG.debug("Table enabled, starting distributed snapshots for {}",
756          ClientSnapshotDescriptionUtils.toString(snapshot));
757      }
758      snapshotEnabledTable(snapshot);
759      if (LOG.isDebugEnabled()) {
760        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
761      }
762    }
763    // For disabled table, snapshot is created by the master
764    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
765      if (LOG.isDebugEnabled()) {
766        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
767          ClientSnapshotDescriptionUtils.toString(snapshot));
768      }
769      snapshotDisabledTable(snapshot);
770      if (LOG.isDebugEnabled()) {
771        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
772      }
773    } else {
774      LOG.error("Can't snapshot table '" + snapshot.getTable()
775        + "', isn't open or closed, we don't know what to do!");
776      TablePartiallyOpenException tpoe =
777        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
778      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
779        ProtobufUtil.createSnapshotDesc(snapshot));
780    }
781
782    // call post coproc hook
783    if (cpHost != null) {
784      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
785    }
786  }
787
788  /**
789   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
790   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
791   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
792   * @param snapshot   description of the snapshot being checked.
793   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
794   *                   we need to check if another zk-coordinated snapshot is in progress, for the
795   *                   snapshot procedure, this is unnecessary.
796   * @return the table descriptor of the table
797   */
798  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
799    boolean checkTable) throws IOException {
800    // check to see if we already completed the snapshot
801    if (isSnapshotCompleted(snapshot)) {
802      throw new SnapshotExistsException(
803        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
804        ProtobufUtil.createSnapshotDesc(snapshot));
805    }
806    LOG.debug("No existing snapshot, attempting snapshot...");
807
808    // stop tracking "abandoned" handlers
809    cleanupSentinels();
810
811    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
812    // make sure we aren't already running a snapshot
813    if (isTakingSnapshot(snapshot, checkTable)) {
814      throw new SnapshotCreationException(
815        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
816          + " because we are already running another snapshot"
817          + " on the same table or with the same name");
818    }
819
820    // make sure we aren't running a restore on the same table
821    if (isRestoringTable(snapshotTable)) {
822      throw new SnapshotCreationException(
823        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
824          + " because we are already have a restore in progress on the same snapshot.");
825    }
826
827    // check to see if the table exists
828    TableDescriptor desc = null;
829    try {
830      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
831    } catch (FileNotFoundException e) {
832      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
833      LOG.error(msg);
834      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
835    } catch (IOException e) {
836      throw new SnapshotCreationException(
837        "Error while geting table description for table " + snapshot.getTable(), e,
838        ProtobufUtil.createSnapshotDesc(snapshot));
839    }
840    if (desc == null) {
841      throw new SnapshotCreationException(
842        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
843        ProtobufUtil.createSnapshotDesc(snapshot));
844    }
845    return desc;
846  }
847
848  /**
849   * Set the handler for the current snapshot
850   * <p>
851   * Exposed for TESTING
852   * @param handler handler the master should use TODO get rid of this if possible, repackaging,
853   *                modify tests.
854   */
855  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
856    final SnapshotSentinel handler) {
857    if (handler != null) {
858      this.snapshotHandlers.put(tableName, handler);
859    } else {
860      this.snapshotHandlers.remove(tableName);
861    }
862  }
863
864  /** Returns distributed commit coordinator for all running snapshots */
865  ProcedureCoordinator getCoordinator() {
866    return coordinator;
867  }
868
869  /**
870   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
871   * snapshot exists in the "completed snapshots folder".
872   * @param snapshot expected snapshot to check
873   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
874   *         not stored
875   * @throws IOException              if the filesystem throws an unexpected exception,
876   * @throws IllegalArgumentException if snapshot name is invalid.
877   */
878  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
879    try {
880      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
881      FileSystem fs = master.getMasterFileSystem().getFileSystem();
882      // check to see if the snapshot already exists
883      return fs.exists(snapshotDir);
884    } catch (IllegalArgumentException iae) {
885      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
886    }
887  }
888
889  /**
890   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
891   * restore in progress.
892   * @param reqSnapshot       Snapshot Descriptor from request
893   * @param tableName         table to clone
894   * @param snapshot          Snapshot Descriptor
895   * @param snapshotTableDesc Table Descriptor
896   * @param nonceKey          unique identifier to prevent duplicated RPC
897   * @return procId the ID of the clone snapshot procedure
898   */
899  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
900    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
901    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
902    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
903    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
904    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
905    if (cpHost != null) {
906      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
907      cpHost.preCloneSnapshot(snapshotPOJO, htd);
908    }
909    long procId;
910    try {
911      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
912    } catch (IOException e) {
913      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
914        + tableName.getNameAsString(), e);
915      throw e;
916    }
917    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
918
919    if (cpHost != null) {
920      cpHost.postCloneSnapshot(snapshotPOJO, htd);
921    }
922    return procId;
923  }
924
925  /**
926   * Clone the specified snapshot into a new table. The operation will fail if the destination table
927   * has a snapshot or restore in progress.
928   * @param snapshot        Snapshot Descriptor
929   * @param tableDescriptor Table Descriptor of the table to create
930   * @param nonceKey        unique identifier to prevent duplicated RPC
931   * @return procId the ID of the clone snapshot procedure
932   */
933  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
934    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
935    final String customSFT) throws HBaseSnapshotException {
936    TableName tableName = tableDescriptor.getTableName();
937
938    // make sure we aren't running a snapshot on the same table
939    if (isTableTakingAnySnapshot(tableName)) {
940      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
941    }
942
943    // make sure we aren't running a restore on the same table
944    if (isRestoringTable(tableName)) {
945      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
946    }
947
948    try {
949      long procId = master.getMasterProcedureExecutor().submitProcedure(
950        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
951          tableDescriptor, snapshot, restoreAcl, customSFT),
952        nonceKey);
953      this.restoreTableToProcIdMap.put(tableName, procId);
954      return procId;
955    } catch (Exception e) {
956      String msg = "Couldn't clone the snapshot="
957        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
958      LOG.error(msg, e);
959      throw new RestoreSnapshotException(msg, e);
960    }
961  }
962
963  /**
964   * Restore or Clone the specified snapshot
965   * @param nonceKey unique identifier to prevent duplicated RPC
966   */
967  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
968    final boolean restoreAcl, String customSFT) throws IOException {
969    FileSystem fs = master.getMasterFileSystem().getFileSystem();
970    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
971
972    // check if the snapshot exists
973    if (!fs.exists(snapshotDir)) {
974      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
975      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
976    }
977
978    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
979    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
980    // information.
981    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
982    SnapshotManifest manifest =
983      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
984    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
985    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
986
987    // sanity check the new table descriptor
988    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
989
990    // stop tracking "abandoned" handlers
991    cleanupSentinels();
992
993    // Verify snapshot validity
994    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
995
996    // Execute the restore/clone operation
997    long procId;
998    if (master.getTableDescriptors().exists(tableName)) {
999      procId =
1000        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1001    } else {
1002      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
1003        restoreAcl, customSFT);
1004    }
1005    return procId;
1006  }
1007
1008  /**
1009   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1010   * or restore in progress.
1011   * @param reqSnapshot       Snapshot Descriptor from request
1012   * @param tableName         table to restore
1013   * @param snapshot          Snapshot Descriptor
1014   * @param snapshotTableDesc Table Descriptor
1015   * @param nonceKey          unique identifier to prevent duplicated RPC
1016   * @param restoreAcl        true to restore acl of snapshot
1017   * @return procId the ID of the restore snapshot procedure
1018   */
1019  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
1020    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
1021    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
1022    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
1023
1024    // have to check first if restoring the snapshot would break current SFT setup
1025    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
1026      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
1027
1028    if (
1029      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
1030        TableState.State.ENABLED)
1031    ) {
1032      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
1033        + "' must be disabled in order to " + "perform a restore operation.");
1034    }
1035
1036    // call Coprocessor pre hook
1037    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1038    if (cpHost != null) {
1039      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1040      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1041    }
1042
1043    long procId;
1044    try {
1045      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1046    } catch (IOException e) {
1047      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1048        + " as table " + tableName.getNameAsString(), e);
1049      throw e;
1050    }
1051    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1052
1053    if (cpHost != null) {
1054      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1055    }
1056
1057    return procId;
1058  }
1059
1060  /**
1061   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1062   * or restore in progress.
1063   * @param snapshot        Snapshot Descriptor
1064   * @param tableDescriptor Table Descriptor
1065   * @param nonceKey        unique identifier to prevent duplicated RPC
1066   * @param restoreAcl      true to restore acl of snapshot
1067   * @return procId the ID of the restore snapshot procedure
1068   */
1069  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1070    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1071    throws HBaseSnapshotException {
1072    final TableName tableName = tableDescriptor.getTableName();
1073
1074    // make sure we aren't running a snapshot on the same table
1075    if (isTableTakingAnySnapshot(tableName)) {
1076      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1077    }
1078
1079    // make sure we aren't running a restore on the same table
1080    if (isRestoringTable(tableName)) {
1081      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1082    }
1083
1084    try {
1085      TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName);
1086      long procId = master.getMasterProcedureExecutor().submitProcedure(
1087        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1088          oldDescriptor, tableDescriptor, snapshot, restoreAcl),
1089        nonceKey);
1090      this.restoreTableToProcIdMap.put(tableName, procId);
1091      return procId;
1092    } catch (Exception e) {
1093      String msg = "Couldn't restore the snapshot="
1094        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1095      LOG.error(msg, e);
1096      throw new RestoreSnapshotException(msg, e);
1097    }
1098  }
1099
1100  /**
1101   * Verify if the restore of the specified table is in progress.
1102   * @param tableName table under restore
1103   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1104   */
1105  private synchronized boolean isRestoringTable(final TableName tableName) {
1106    Long procId = this.restoreTableToProcIdMap.get(tableName);
1107    if (procId == null) {
1108      return false;
1109    }
1110    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1111    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1112      return true;
1113    } else {
1114      this.restoreTableToProcIdMap.remove(tableName);
1115      return false;
1116    }
1117  }
1118
1119  /**
1120   * Return the handler if it is currently live and has the same snapshot target name. The handler
1121   * is removed from the sentinels map if completed.
1122   * @param sentinels live handlers
1123   * @param snapshot  snapshot description
1124   * @return null if doesn't match, else a live handler.
1125   */
1126  private synchronized SnapshotSentinel removeSentinelIfFinished(
1127    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1128    if (!snapshot.hasTable()) {
1129      return null;
1130    }
1131
1132    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1133    SnapshotSentinel h = sentinels.get(snapshotTable);
1134    if (h == null) {
1135      return null;
1136    }
1137
1138    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1139      // specified snapshot is to the one currently running
1140      return null;
1141    }
1142
1143    // Remove from the "in-progress" list once completed
1144    if (h.isFinished()) {
1145      sentinels.remove(snapshotTable);
1146    }
1147
1148    return h;
1149  }
1150
1151  /**
1152   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1153   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1154   * the status of a completed task is requested. To avoid having sentinels staying around for long
1155   * time if something client side is failed, each operation tries to clean up the in-progress maps
1156   * sentinels finished from a long time.
1157   */
1158  private void cleanupSentinels() {
1159    cleanupSentinels(this.snapshotHandlers);
1160    cleanupCompletedRestoreInMap();
1161    cleanupCompletedSnapshotInMap();
1162  }
1163
1164  /**
1165   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1166   * removal timeout.
1167   * @param sentinels map of sentinels to clean
1168   */
1169  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1170    long currentTime = EnvironmentEdgeManager.currentTime();
1171    long sentinelsCleanupTimeoutMillis =
1172      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1173        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1174    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1175    while (it.hasNext()) {
1176      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1177      SnapshotSentinel sentinel = entry.getValue();
1178      if (
1179        sentinel.isFinished()
1180          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1181      ) {
1182        it.remove();
1183      }
1184    }
1185  }
1186
1187  /**
1188   * Remove the procedures that are marked as finished
1189   */
1190  private synchronized void cleanupCompletedRestoreInMap() {
1191    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1192    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1193    while (it.hasNext()) {
1194      Map.Entry<TableName, Long> entry = it.next();
1195      Long procId = entry.getValue();
1196      if (procExec.isRunning() && procExec.isFinished(procId)) {
1197        it.remove();
1198      }
1199    }
1200  }
1201
1202  /**
1203   * Remove the procedures that are marked as finished
1204   */
1205  private synchronized void cleanupCompletedSnapshotInMap() {
1206    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1207    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1208    while (it.hasNext()) {
1209      Map.Entry<SnapshotDescription, Long> entry = it.next();
1210      Long procId = entry.getValue();
1211      if (procExec.isRunning() && procExec.isFinished(procId)) {
1212        it.remove();
1213      }
1214    }
1215  }
1216
1217  //
1218  // Implementing Stoppable interface
1219  //
1220
1221  @Override
1222  public void stop(String why) {
1223    // short circuit
1224    if (this.stopped) return;
1225    // make sure we get stop
1226    this.stopped = true;
1227    // pass the stop onto take snapshot handlers
1228    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1229      snapshotHandler.cancel(why);
1230    }
1231    if (snapshotHandlerChoreCleanerTask != null) {
1232      snapshotHandlerChoreCleanerTask.cancel(true);
1233    }
1234    try {
1235      if (coordinator != null) {
1236        coordinator.close();
1237      }
1238    } catch (IOException e) {
1239      LOG.error("stop ProcedureCoordinator error", e);
1240    }
1241  }
1242
1243  @Override
1244  public boolean isStopped() {
1245    return this.stopped;
1246  }
1247
1248  /**
1249   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1250   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1251   * @throws UnsupportedOperationException if snapshot are not supported
1252   */
1253  public void checkSnapshotSupport() throws UnsupportedOperationException {
1254    if (!this.isSnapshotSupported) {
1255      throw new UnsupportedOperationException(
1256        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1257          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1258    }
1259  }
1260
1261  /**
1262   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1263   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1264   * up with snapshot data loss.
1265   * @param conf The {@link Configuration} object to use
1266   * @param mfs  The MasterFileSystem to use
1267   * @throws IOException                   in case of file-system operation failure
1268   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1269   *                                       system
1270   */
1271  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1272    throws IOException, UnsupportedOperationException {
1273    // Verify if snapshot is disabled by the user
1274    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1275    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1276    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1277
1278    // Extract cleaners from conf
1279    Set<String> hfileCleaners = new HashSet<>();
1280    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1281    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1282
1283    Set<String> logCleaners = new HashSet<>();
1284    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1285    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1286
1287    // check if an older version of snapshot directory was present
1288    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1289    FileSystem fs = mfs.getFileSystem();
1290    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1291    if (ss != null && !ss.isEmpty()) {
1292      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1293      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1294    }
1295
1296    // If the user has enabled the snapshot, we force the cleaners to be present
1297    // otherwise we still need to check if cleaners are enabled or not and verify
1298    // that there're no snapshot in the .snapshot folder.
1299    if (snapshotEnabled) {
1300      // Inject snapshot cleaners, if snapshot.enable is true
1301      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1302      hfileCleaners.add(HFileLinkCleaner.class.getName());
1303      // If sync acl to HDFS feature is enabled, then inject the cleaner
1304      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1305        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1306      }
1307
1308      // Set cleaners conf
1309      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1310        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1311      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1312        logCleaners.toArray(new String[logCleaners.size()]));
1313    } else {
1314      // There may be restore tables if snapshot is enabled and then disabled, so add
1315      // HFileLinkCleaner, see HBASE-26670 for more details.
1316      hfileCleaners.add(HFileLinkCleaner.class.getName());
1317      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1318        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1319      // Verify if SnapshotHFileCleaner are present
1320      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1321
1322      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1323      if (snapshotEnabled) {
1324        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1325          + HBASE_SNAPSHOT_ENABLED + "' property "
1326          + (userDisabled ? "is set to 'false'." : "is not set."));
1327      }
1328    }
1329
1330    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1331    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1332
1333    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1334    // otherwise we end up with snapshot data loss.
1335    if (!snapshotEnabled) {
1336      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1337      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1338      if (fs.exists(snapshotDir)) {
1339        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1340          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1341        if (snapshots != null) {
1342          LOG.error("Snapshots are present, but cleaners are not enabled.");
1343          checkSnapshotSupport();
1344        }
1345      }
1346    }
1347  }
1348
1349  @Override
1350  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1351    throws KeeperException, IOException, UnsupportedOperationException {
1352    this.master = master;
1353
1354    this.rootDir = master.getMasterFileSystem().getRootDir();
1355    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1356
1357    // get the configuration for the coordinator
1358    Configuration conf = master.getConfiguration();
1359    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1360    long timeoutMillis = Math.max(
1361      conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1362        SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1363      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1364        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1365    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1366
1367    // setup the default procedure coordinator
1368    String name = master.getServerName().toString();
1369    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1370    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1371      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1372
1373    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1374    this.executorService = master.getExecutorService();
1375    this.verifyWorkerAssigner =
1376      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1377        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1378    restoreUnfinishedSnapshotProcedure();
1379    restoreWorkers();
1380    resetTempDir();
1381    snapshotHandlerChoreCleanerTask =
1382      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1383  }
1384
1385  private void restoreUnfinishedSnapshotProcedure() {
1386    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1387      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1388      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1389        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1390        LOG.info("restore unfinished snapshot procedure {}", p);
1391      });
1392  }
1393
1394  @Override
1395  public String getProcedureSignature() {
1396    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1397  }
1398
1399  @Override
1400  public void execProcedure(ProcedureDescription desc) throws IOException {
1401    takeSnapshot(toSnapshotDescription(desc));
1402  }
1403
1404  @Override
1405  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1406    throws IOException {
1407    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1408    // In future, when we AC is removed for good, that check should be moved here.
1409  }
1410
1411  @Override
1412  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1413    return isSnapshotDone(toSnapshotDescription(desc));
1414  }
1415
1416  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1417    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1418    if (!desc.hasInstance()) {
1419      throw new IOException("Snapshot name is not defined: " + desc.toString());
1420    }
1421    String snapshotName = desc.getInstance();
1422    List<NameStringPair> props = desc.getConfigurationList();
1423    String table = null;
1424    for (NameStringPair prop : props) {
1425      if ("table".equalsIgnoreCase(prop.getName())) {
1426        table = prop.getValue();
1427      }
1428    }
1429    if (table == null) {
1430      throw new IOException("Snapshot table is not defined: " + desc.toString());
1431    }
1432    TableName tableName = TableName.valueOf(table);
1433    builder.setTable(tableName.getNameAsString());
1434    builder.setName(snapshotName);
1435    builder.setType(SnapshotDescription.Type.FLUSH);
1436    return builder.build();
1437  }
1438
1439  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1440    snapshotToProcIdMap.put(snapshot, procId);
1441    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1442      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1443  }
1444
1445  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1446    snapshotToProcIdMap.remove(snapshot, procId);
1447    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1448      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1449  }
1450
1451  public boolean snapshotProcedureEnabled() {
1452    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1453      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1454  }
1455
1456  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1457    throws ProcedureSuspendedException {
1458    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
1459    if (worker.isPresent()) {
1460      LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
1461      return worker.get();
1462    }
1463    verifyWorkerAssigner.suspend(procedure);
1464    throw new ProcedureSuspendedException();
1465  }
1466
1467  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
1468    MasterProcedureScheduler scheduler) {
1469    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1470    verifyWorkerAssigner.release(worker);
1471    verifyWorkerAssigner.wake(scheduler);
1472  }
1473
1474  private void restoreWorkers() {
1475    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1476      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1477      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1478        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1479        LOG.debug("{} restores used worker {}", p, p.getServerName());
1480      });
1481  }
1482
1483  public Integer getAvailableWorker(ServerName serverName) {
1484    return verifyWorkerAssigner.getAvailableWorker(serverName);
1485  }
1486}