001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.charset.StandardCharsets;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.Objects;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.NamespaceDescriptor;
046import org.apache.hadoop.hbase.NamespaceExistException;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotDisabledException;
051import org.apache.hadoop.hbase.backup.BackupInfo;
052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
053import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
054import org.apache.hadoop.hbase.backup.BackupType;
055import org.apache.hadoop.hbase.backup.util.BackupUtils;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.Delete;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.SnapshotDescription;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
078import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
079
080import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
082
083/**
084 * This class provides API to access backup system table<br>
085 * Backup system table schema:<br>
086 * <p>
087 * <ul>
088 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
089 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
090 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
091 * include table; value=empty</li>
092 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
093 * timestamp]</li>
094 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
095 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
096 * name</li>
097 * </ul>
098 * </p>
099 */
100@InterfaceAudience.Private
101public final class BackupSystemTable implements Closeable {
102
103  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
104
105  static class WALItem {
106    String backupId;
107    String walFile;
108    String backupRoot;
109
110    WALItem(String backupId, String walFile, String backupRoot) {
111      this.backupId = backupId;
112      this.walFile = walFile;
113      this.backupRoot = backupRoot;
114    }
115
116    public String getBackupId() {
117      return backupId;
118    }
119
120    public String getWalFile() {
121      return walFile;
122    }
123
124    public String getBackupRoot() {
125      return backupRoot;
126    }
127
128    @Override
129    public String toString() {
130      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
131    }
132  }
133
134  /**
135   * Backup system table (main) name
136   */
137  private TableName tableName;
138
139  /**
140   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
141   * separate table because we have to isolate general backup operations: create, merge etc from
142   * activity of RegionObserver, which controls process of a bulk loading
143   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
144   */
145  private TableName bulkLoadTableName;
146
147  /**
148   * Stores backup sessions (contexts)
149   */
150  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
151  /**
152   * Stores other meta
153   */
154  final static byte[] META_FAMILY = Bytes.toBytes("meta");
155  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
156  /**
157   * Connection to HBase cluster, shared among all instances
158   */
159  private final Connection connection;
160
161  private final static String BACKUP_INFO_PREFIX = "session:";
162  private final static String START_CODE_ROW = "startcode:";
163  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
164  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
165
166  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
167  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
168
169  private final static String INCR_BACKUP_SET = "incrbackupset:";
170  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
171  private final static String RS_LOG_TS_PREFIX = "rslogts:";
172
173  private final static String BULK_LOAD_PREFIX = "bulk:";
174  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
175  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
176  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
177
178  final static byte[] TBL_COL = Bytes.toBytes("tbl");
179  final static byte[] FAM_COL = Bytes.toBytes("fam");
180  final static byte[] PATH_COL = Bytes.toBytes("path");
181
182  private final static String SET_KEY_PREFIX = "backupset:";
183
184  // separator between BULK_LOAD_PREFIX and ordinals
185  private final static String BLK_LD_DELIM = ":";
186  private final static byte[] EMPTY_VALUE = new byte[] {};
187
188  // Safe delimiter in a string
189  private final static String NULL = "\u0000";
190
191  public BackupSystemTable(Connection conn) throws IOException {
192    this.connection = conn;
193    Configuration conf = this.connection.getConfiguration();
194    tableName = BackupSystemTable.getTableName(conf);
195    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
196    checkSystemTable();
197  }
198
199  private void checkSystemTable() throws IOException {
200    try (Admin admin = connection.getAdmin()) {
201      verifyNamespaceExists(admin);
202      Configuration conf = connection.getConfiguration();
203      if (!admin.tableExists(tableName)) {
204        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
205        createSystemTable(admin, backupHTD);
206      }
207      ensureTableEnabled(admin, tableName);
208      if (!admin.tableExists(bulkLoadTableName)) {
209        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
210        createSystemTable(admin, blHTD);
211      }
212      ensureTableEnabled(admin, bulkLoadTableName);
213      waitForSystemTable(admin, tableName);
214      waitForSystemTable(admin, bulkLoadTableName);
215    }
216  }
217
218  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
219    try {
220      admin.createTable(descriptor);
221    } catch (TableExistsException e) {
222      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
223      // so may be subject to race conditions where one caller succeeds in creating the
224      // table and others fail because it now exists
225      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
226    }
227  }
228
229  private void verifyNamespaceExists(Admin admin) throws IOException {
230    String namespaceName = tableName.getNamespaceAsString();
231    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
232    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
233    boolean exists = false;
234    for (NamespaceDescriptor nsd : list) {
235      if (nsd.getName().equals(ns.getName())) {
236        exists = true;
237        break;
238      }
239    }
240    if (!exists) {
241      try {
242        admin.createNamespace(ns);
243      } catch (NamespaceExistException e) {
244        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
245        // so may be subject to race conditions where one caller succeeds in creating the
246        // namespace and others fail because it now exists
247        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
248      }
249    }
250  }
251
252  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
253    // Return fast if the table is available and avoid a log message
254    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
255      return;
256    }
257    long TIMEOUT = 60000;
258    long startTime = EnvironmentEdgeManager.currentTime();
259    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
260      tableName);
261    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
262      try {
263        Thread.sleep(100);
264      } catch (InterruptedException e) {
265        throw (IOException) new InterruptedIOException().initCause(e);
266      }
267      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
268        throw new IOException(
269          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
270      }
271    }
272    LOG.debug("Backup table {} exists and available", tableName);
273  }
274
275  @Override
276  public void close() {
277    // do nothing
278  }
279
280  /**
281   * Updates status (state) of a backup session in backup system table table
282   * @param info backup info
283   * @throws IOException exception
284   */
285  public void updateBackupInfo(BackupInfo info) throws IOException {
286    if (LOG.isTraceEnabled()) {
287      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
288        + " set status=" + info.getState());
289    }
290    try (Table table = connection.getTable(tableName)) {
291      Put put = createPutForBackupInfo(info);
292      table.put(put);
293    }
294  }
295
296  /*
297   * @param backupId the backup Id
298   * @return Map of rows to path of bulk loaded hfile
299   */
300  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
301    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
302    try (Table table = connection.getTable(bulkLoadTableName);
303      ResultScanner scanner = table.getScanner(scan)) {
304      Result res = null;
305      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
306      while ((res = scanner.next()) != null) {
307        res.advance();
308        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
309        for (Cell cell : res.listCells()) {
310          if (
311            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
312              BackupSystemTable.PATH_COL.length) == 0
313          ) {
314            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
315          }
316        }
317      }
318      return map;
319    }
320  }
321
322  /*
323   * Used during restore
324   * @param backupId the backup Id
325   * @param sTableList List of tables
326   * @return array of Map of family to List of Paths
327   */
328  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
329    throws IOException {
330    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
331    @SuppressWarnings("unchecked")
332    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
333    try (Table table = connection.getTable(bulkLoadTableName);
334      ResultScanner scanner = table.getScanner(scan)) {
335      Result res = null;
336      while ((res = scanner.next()) != null) {
337        res.advance();
338        TableName tbl = null;
339        byte[] fam = null;
340        String path = null;
341        for (Cell cell : res.listCells()) {
342          if (
343            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
344              BackupSystemTable.TBL_COL.length) == 0
345          ) {
346            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
347          } else if (
348            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
349              BackupSystemTable.FAM_COL.length) == 0
350          ) {
351            fam = CellUtil.cloneValue(cell);
352          } else if (
353            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
354              BackupSystemTable.PATH_COL.length) == 0
355          ) {
356            path = Bytes.toString(CellUtil.cloneValue(cell));
357          }
358        }
359        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
360        if (srcIdx == -1) {
361          // the table is not among the query
362          continue;
363        }
364        if (mapForSrc[srcIdx] == null) {
365          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
366        }
367        List<Path> files;
368        if (!mapForSrc[srcIdx].containsKey(fam)) {
369          files = new ArrayList<Path>();
370          mapForSrc[srcIdx].put(fam, files);
371        } else {
372          files = mapForSrc[srcIdx].get(fam);
373        }
374        files.add(new Path(path));
375        if (LOG.isDebugEnabled()) {
376          LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path);
377        }
378      }
379
380      return mapForSrc;
381    }
382  }
383
384  /**
385   * Deletes backup status from backup system table table
386   * @param backupId backup id
387   * @throws IOException exception
388   */
389  public void deleteBackupInfo(String backupId) throws IOException {
390    if (LOG.isTraceEnabled()) {
391      LOG.trace("delete backup status in backup system table for " + backupId);
392    }
393    try (Table table = connection.getTable(tableName)) {
394      Delete del = createDeleteForBackupInfo(backupId);
395      table.delete(del);
396    }
397  }
398
399  /**
400   * Registers a bulk load.
401   * @param tableName     table name
402   * @param region        the region receiving hfile
403   * @param cfToHfilePath column family and associated hfiles
404   */
405  public void registerBulkLoad(TableName tableName, byte[] region,
406    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
407    if (LOG.isDebugEnabled()) {
408      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
409        cfToHfilePath.size());
410    }
411    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
412      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
413      bufferedMutator.mutate(puts);
414      LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
415    }
416  }
417
418  /*
419   * Removes rows recording bulk loaded hfiles from backup table
420   * @param lst list of table names
421   * @param rows the rows to be deleted
422   */
423  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
424    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
425      List<Delete> lstDels = new ArrayList<>();
426      for (byte[] row : rows) {
427        Delete del = new Delete(row);
428        lstDels.add(del);
429        LOG.debug("orig deleting the row: " + Bytes.toString(row));
430      }
431      bufferedMutator.mutate(lstDels);
432      LOG.debug("deleted " + rows.size() + " original bulkload rows");
433    }
434  }
435
436  /**
437   * Reads the rows from backup table recording bulk loaded hfiles
438   * @param tableList list of table names
439   */
440  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
441    List<BulkLoad> result = new ArrayList<>();
442    for (TableName table : tableList) {
443      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
444      try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
445        ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
446        Result res;
447        while ((res = scanner.next()) != null) {
448          res.advance();
449          String fam = null;
450          String path = null;
451          String region = null;
452          byte[] row = null;
453          for (Cell cell : res.listCells()) {
454            row = CellUtil.cloneRow(cell);
455            String rowStr = Bytes.toString(row);
456            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
457            if (
458              CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
459                BackupSystemTable.FAM_COL.length) == 0
460            ) {
461              fam = Bytes.toString(CellUtil.cloneValue(cell));
462            } else if (
463              CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
464                BackupSystemTable.PATH_COL.length) == 0
465            ) {
466              path = Bytes.toString(CellUtil.cloneValue(cell));
467            }
468          }
469          result.add(new BulkLoad(table, region, fam, path, row));
470          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
471        }
472      }
473    }
474    return result;
475  }
476
477  /*
478   * @param sTableList List of tables
479   * @param maps array of Map of family to List of Paths
480   * @param backupId the backup Id
481   */
482  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
483    String backupId) throws IOException {
484    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
485      long ts = EnvironmentEdgeManager.currentTime();
486      int cnt = 0;
487      List<Put> puts = new ArrayList<>();
488      for (int idx = 0; idx < maps.length; idx++) {
489        Map<byte[], List<Path>> map = maps[idx];
490        TableName tn = sTableList.get(idx);
491
492        if (map == null) {
493          continue;
494        }
495
496        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
497          byte[] fam = entry.getKey();
498          List<Path> paths = entry.getValue();
499          for (Path p : paths) {
500            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
501              ts, cnt++);
502            puts.add(put);
503          }
504        }
505      }
506      if (!puts.isEmpty()) {
507        bufferedMutator.mutate(puts);
508      }
509    }
510  }
511
512  /**
513   * Reads backup status object (instance of backup info) from backup system table table
514   * @param backupId backup id
515   * @return Current status of backup session or null
516   */
517  public BackupInfo readBackupInfo(String backupId) throws IOException {
518    if (LOG.isTraceEnabled()) {
519      LOG.trace("read backup status from backup system table for: " + backupId);
520    }
521
522    try (Table table = connection.getTable(tableName)) {
523      Get get = createGetForBackupInfo(backupId);
524      Result res = table.get(get);
525      if (res.isEmpty()) {
526        return null;
527      }
528      return resultToBackupInfo(res);
529    }
530  }
531
532  /**
533   * Read the last backup start code (timestamp) of last successful backup. Will return null if
534   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
535   * there is no successful backup completed so far.
536   * @param backupRoot directory path to backup destination
537   * @return the timestamp of last successful backup
538   * @throws IOException exception
539   */
540  public String readBackupStartCode(String backupRoot) throws IOException {
541    LOG.trace("read backup start code from backup system table");
542
543    try (Table table = connection.getTable(tableName)) {
544      Get get = createGetForStartCode(backupRoot);
545      Result res = table.get(get);
546      if (res.isEmpty()) {
547        return null;
548      }
549      Cell cell = res.listCells().get(0);
550      byte[] val = CellUtil.cloneValue(cell);
551      if (val.length == 0) {
552        return null;
553      }
554      return new String(val, StandardCharsets.UTF_8);
555    }
556  }
557
558  /**
559   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
560   * @param startCode  start code
561   * @param backupRoot root directory path to backup
562   * @throws IOException exception
563   */
564  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
565    if (LOG.isTraceEnabled()) {
566      LOG.trace("write backup start code to backup system table " + startCode);
567    }
568    try (Table table = connection.getTable(tableName)) {
569      Put put = createPutForStartCode(startCode.toString(), backupRoot);
570      table.put(put);
571    }
572  }
573
574  /**
575   * Exclusive operations are: create, delete, merge
576   * @throws IOException if a table operation fails or an active backup exclusive operation is
577   *                     already underway
578   */
579  public void startBackupExclusiveOperation() throws IOException {
580    LOG.debug("Start new backup exclusive operation");
581
582    try (Table table = connection.getTable(tableName)) {
583      Put put = createPutForStartBackupSession();
584      // First try to put if row does not exist
585      if (
586        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
587          .ifNotExists().thenPut(put)
588      ) {
589        // Row exists, try to put if value == ACTIVE_SESSION_NO
590        if (
591          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
592            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
593        ) {
594          throw new ExclusiveOperationException();
595        }
596      }
597    }
598  }
599
600  private Put createPutForStartBackupSession() {
601    Put put = new Put(ACTIVE_SESSION_ROW);
602    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
603    return put;
604  }
605
606  public void finishBackupExclusiveOperation() throws IOException {
607    LOG.debug("Finish backup exclusive operation");
608
609    try (Table table = connection.getTable(tableName)) {
610      Put put = createPutForStopBackupSession();
611      if (
612        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
613          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
614      ) {
615        throw new IOException("There is no active backup exclusive operation");
616      }
617    }
618  }
619
620  private Put createPutForStopBackupSession() {
621    Put put = new Put(ACTIVE_SESSION_ROW);
622    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
623    return put;
624  }
625
626  /**
627   * Get the Region Servers log information after the last log roll from backup system table.
628   * @param backupRoot root directory path to backup
629   * @return RS log info
630   * @throws IOException exception
631   */
632  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
633    throws IOException {
634    LOG.trace("read region server last roll log result to backup system table");
635
636    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
637
638    try (Table table = connection.getTable(tableName);
639      ResultScanner scanner = table.getScanner(scan)) {
640      Result res;
641      HashMap<String, Long> rsTimestampMap = new HashMap<>();
642      while ((res = scanner.next()) != null) {
643        res.advance();
644        Cell cell = res.current();
645        byte[] row = CellUtil.cloneRow(cell);
646        String server = getServerNameForReadRegionServerLastLogRollResult(row);
647        byte[] data = CellUtil.cloneValue(cell);
648        rsTimestampMap.put(server, Bytes.toLong(data));
649      }
650      return rsTimestampMap;
651    }
652  }
653
654  /**
655   * Writes Region Server last roll log result (timestamp) to backup system table table
656   * @param server     Region Server name
657   * @param ts         last log timestamp
658   * @param backupRoot root directory path to backup
659   * @throws IOException exception
660   */
661  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
662    throws IOException {
663    LOG.trace("write region server last roll log result to backup system table");
664
665    try (Table table = connection.getTable(tableName)) {
666      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
667      table.put(put);
668    }
669  }
670
671  /**
672   * Get all completed backup information (in desc order by time)
673   * @param onlyCompleted true, if only successfully completed sessions
674   * @return history info of BackupCompleteData
675   * @throws IOException exception
676   */
677  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
678    LOG.trace("get backup history from backup system table");
679
680    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
681    ArrayList<BackupInfo> list = getBackupInfos(state);
682    return BackupUtils.sortHistoryListDesc(list);
683  }
684
685  /**
686   * Get all backups history
687   * @return list of backup info
688   * @throws IOException if getting the backup history fails
689   */
690  public List<BackupInfo> getBackupHistory() throws IOException {
691    return getBackupHistory(false);
692  }
693
694  /**
695   * Get first n backup history records
696   * @param n number of records, if n== -1 - max number is ignored
697   * @return list of records
698   * @throws IOException if getting the backup history fails
699   */
700  public List<BackupInfo> getHistory(int n) throws IOException {
701    List<BackupInfo> history = getBackupHistory();
702    if (n == -1 || history.size() <= n) {
703      return history;
704    }
705    return Collections.unmodifiableList(history.subList(0, n));
706  }
707
708  /**
709   * Get backup history records filtered by list of filters.
710   * @param n       max number of records, if n == -1 , then max number is ignored
711   * @param filters list of filters
712   * @return backup records
713   * @throws IOException if getting the backup history fails
714   */
715  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
716    if (filters.length == 0) {
717      return getHistory(n);
718    }
719
720    List<BackupInfo> history = getBackupHistory();
721    List<BackupInfo> result = new ArrayList<>();
722    for (BackupInfo bi : history) {
723      if (n >= 0 && result.size() == n) {
724        break;
725      }
726
727      boolean passed = true;
728      for (int i = 0; i < filters.length; i++) {
729        if (!filters[i].apply(bi)) {
730          passed = false;
731          break;
732        }
733      }
734      if (passed) {
735        result.add(bi);
736      }
737    }
738    return result;
739  }
740
741  /**
742   * Retrieve all table names that are part of any known backup
743   */
744  public Set<TableName> getTablesIncludedInBackups() throws IOException {
745    Set<TableName> names = new HashSet<>();
746    List<BackupInfo> infos = getBackupHistory(true);
747    for (BackupInfo info : infos) {
748      // Incremental backups have the same tables as the preceding full backups
749      if (info.getType() == BackupType.FULL) {
750        names.addAll(info.getTableNames());
751      }
752    }
753    return names;
754  }
755
756  /**
757   * Get history for backup destination
758   * @param backupRoot backup destination path
759   * @return List of backup info
760   * @throws IOException if getting the backup history fails
761   */
762  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
763    ArrayList<BackupInfo> history = getBackupHistory(false);
764    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
765      BackupInfo info = iterator.next();
766      if (!backupRoot.equals(info.getBackupRootDir())) {
767        iterator.remove();
768      }
769    }
770    return history;
771  }
772
773  /**
774   * Get history for a table
775   * @param name table name
776   * @return history for a table
777   * @throws IOException if getting the backup history fails
778   */
779  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
780    List<BackupInfo> history = getBackupHistory();
781    List<BackupInfo> tableHistory = new ArrayList<>();
782    for (BackupInfo info : history) {
783      List<TableName> tables = info.getTableNames();
784      if (tables.contains(name)) {
785        tableHistory.add(info);
786      }
787    }
788    return tableHistory;
789  }
790
791  /**
792   * Goes through all backup history corresponding to the provided root folder, and collects all
793   * backup info mentioning each of the provided tables.
794   * @param set        the tables for which to collect the {@code BackupInfo}
795   * @param backupRoot backup destination path to retrieve backup history for
796   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
797   *         least one {@code BackupInfo}
798   * @throws IOException if getting the backup history fails
799   */
800  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
801    String backupRoot) throws IOException {
802    List<BackupInfo> history = getBackupHistory(backupRoot);
803    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
804    for (BackupInfo info : history) {
805      List<TableName> tables = info.getTableNames();
806      for (TableName tableName : tables) {
807        if (set.contains(tableName)) {
808          List<BackupInfo> list =
809            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
810          list.add(info);
811        }
812      }
813    }
814    return tableHistoryMap;
815  }
816
817  /**
818   * Get all backup sessions with a given state (in descending order by time)
819   * @param state backup session state
820   * @return history info of backup info objects
821   * @throws IOException exception
822   */
823  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
824    LOG.trace("get backup infos from backup system table");
825
826    Scan scan = createScanForBackupHistory();
827    ArrayList<BackupInfo> list = new ArrayList<>();
828
829    try (Table table = connection.getTable(tableName);
830      ResultScanner scanner = table.getScanner(scan)) {
831      Result res;
832      while ((res = scanner.next()) != null) {
833        res.advance();
834        BackupInfo context = cellToBackupInfo(res.current());
835        if (state != BackupState.ANY && context.getState() != state) {
836          continue;
837        }
838        list.add(context);
839      }
840      return list;
841    }
842  }
843
844  /**
845   * Write the current timestamps for each regionserver to backup system table after a successful
846   * full or incremental backup. The saved timestamp is of the last log file that was backed up
847   * already.
848   * @param tables        tables
849   * @param newTimestamps timestamps
850   * @param backupRoot    root directory path to backup
851   * @throws IOException exception
852   */
853  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
854    String backupRoot) throws IOException {
855    if (LOG.isTraceEnabled()) {
856      LOG.trace("write RS log time stamps to backup system table for tables ["
857        + StringUtils.join(tables, ",") + "]");
858    }
859    List<Put> puts = new ArrayList<>();
860    for (TableName table : tables) {
861      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
862      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
863      puts.add(put);
864    }
865    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
866      bufferedMutator.mutate(puts);
867    }
868  }
869
870  /**
871   * Read the timestamp for each region server log after the last successful backup. Each table has
872   * its own set of the timestamps. The info is stored for each table as a concatenated string of
873   * rs->timestapmp
874   * @param backupRoot root directory path to backup
875   * @return the timestamp for each region server. key: tableName value:
876   *         RegionServer,PreviousTimeStamp
877   * @throws IOException exception
878   */
879  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
880    throws IOException {
881    if (LOG.isTraceEnabled()) {
882      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
883    }
884
885    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
886
887    Scan scan = createScanForReadLogTimestampMap(backupRoot);
888    try (Table table = connection.getTable(tableName);
889      ResultScanner scanner = table.getScanner(scan)) {
890      Result res;
891      while ((res = scanner.next()) != null) {
892        res.advance();
893        Cell cell = res.current();
894        byte[] row = CellUtil.cloneRow(cell);
895        String tabName = getTableNameForReadLogTimestampMap(row);
896        TableName tn = TableName.valueOf(tabName);
897        byte[] data = CellUtil.cloneValue(cell);
898        if (data == null) {
899          throw new IOException("Data of last backup data from backup system table "
900            + "is empty. Create a backup first.");
901        }
902        if (data != null && data.length > 0) {
903          HashMap<String, Long> lastBackup =
904            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
905          tableTimestampMap.put(tn, lastBackup);
906        }
907      }
908      return tableTimestampMap;
909    }
910  }
911
912  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
913    Map<String, Long> map) {
914    BackupProtos.TableServerTimestamp.Builder tstBuilder =
915      BackupProtos.TableServerTimestamp.newBuilder();
916    tstBuilder
917      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
918
919    for (Entry<String, Long> entry : map.entrySet()) {
920      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
921      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
922      ServerName sn = ServerName.parseServerName(entry.getKey());
923      snBuilder.setHostName(sn.getHostname());
924      snBuilder.setPort(sn.getPort());
925      builder.setServerName(snBuilder.build());
926      builder.setTimestamp(entry.getValue());
927      tstBuilder.addServerTimestamp(builder.build());
928    }
929
930    return tstBuilder.build();
931  }
932
933  private HashMap<String, Long>
934    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
935
936    HashMap<String, Long> map = new HashMap<>();
937    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
938    for (BackupProtos.ServerTimestamp st : list) {
939      ServerName sn =
940        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
941      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
942    }
943    return map;
944  }
945
946  /**
947   * Return the current tables covered by incremental backup.
948   * @param backupRoot root directory path to backup
949   * @return set of tableNames
950   * @throws IOException exception
951   */
952  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
953    LOG.trace("get incremental backup table set from backup system table");
954
955    TreeSet<TableName> set = new TreeSet<>();
956
957    try (Table table = connection.getTable(tableName)) {
958      Get get = createGetForIncrBackupTableSet(backupRoot);
959      Result res = table.get(get);
960      if (res.isEmpty()) {
961        return set;
962      }
963      List<Cell> cells = res.listCells();
964      for (Cell cell : cells) {
965        // qualifier = table name - we use table names as qualifiers
966        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
967      }
968      return set;
969    }
970  }
971
972  /**
973   * Add tables to global incremental backup set
974   * @param tables     set of tables
975   * @param backupRoot root directory path to backup
976   * @throws IOException exception
977   */
978  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
979    throws IOException {
980    if (LOG.isTraceEnabled()) {
981      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
982        + " tables [" + StringUtils.join(tables, " ") + "]");
983    }
984    if (LOG.isDebugEnabled()) {
985      tables.forEach(table -> LOG.debug(Objects.toString(table)));
986    }
987    try (Table table = connection.getTable(tableName)) {
988      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
989      table.put(put);
990    }
991  }
992
993  /**
994   * Deletes incremental backup set for a backup destination
995   * @param backupRoot backup root
996   */
997  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
998    if (LOG.isTraceEnabled()) {
999      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
1000    }
1001    try (Table table = connection.getTable(tableName)) {
1002      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
1003      table.delete(delete);
1004    }
1005  }
1006
1007  /**
1008   * Checks if we have at least one backup session in backup system table This API is used by
1009   * BackupLogCleaner
1010   * @return true, if - at least one session exists in backup system table table
1011   * @throws IOException exception
1012   */
1013  public boolean hasBackupSessions() throws IOException {
1014    LOG.trace("Has backup sessions from backup system table");
1015
1016    boolean result = false;
1017    Scan scan = createScanForBackupHistory();
1018    scan.setCaching(1);
1019    try (Table table = connection.getTable(tableName);
1020      ResultScanner scanner = table.getScanner(scan)) {
1021      if (scanner.next() != null) {
1022        result = true;
1023      }
1024      return result;
1025    }
1026  }
1027
1028  /**
1029   * BACKUP SETS
1030   */
1031
1032  /**
1033   * Get backup set list
1034   * @return backup set list
1035   * @throws IOException if a table or scanner operation fails
1036   */
1037  public List<String> listBackupSets() throws IOException {
1038    LOG.trace("Backup set list");
1039
1040    List<String> list = new ArrayList<>();
1041    try (Table table = connection.getTable(tableName)) {
1042      Scan scan = createScanForBackupSetList();
1043      scan.readVersions(1);
1044      try (ResultScanner scanner = table.getScanner(scan)) {
1045        Result res;
1046        while ((res = scanner.next()) != null) {
1047          res.advance();
1048          list.add(cellKeyToBackupSetName(res.current()));
1049        }
1050        return list;
1051      }
1052    }
1053  }
1054
1055  /**
1056   * Get backup set description (list of tables)
1057   * @param name set's name
1058   * @return list of tables in a backup set
1059   * @throws IOException if a table operation fails
1060   */
1061  public List<TableName> describeBackupSet(String name) throws IOException {
1062    if (LOG.isTraceEnabled()) {
1063      LOG.trace(" Backup set describe: " + name);
1064    }
1065    try (Table table = connection.getTable(tableName)) {
1066      Get get = createGetForBackupSet(name);
1067      Result res = table.get(get);
1068      if (res.isEmpty()) {
1069        return null;
1070      }
1071      res.advance();
1072      String[] tables = cellValueToBackupSet(res.current());
1073      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
1074        .collect(Collectors.toList());
1075    }
1076  }
1077
1078  /**
1079   * Add backup set (list of tables)
1080   * @param name      set name
1081   * @param newTables list of tables, comma-separated
1082   * @throws IOException if a table operation fails
1083   */
1084  public void addToBackupSet(String name, String[] newTables) throws IOException {
1085    if (LOG.isTraceEnabled()) {
1086      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1087    }
1088    String[] union = null;
1089    try (Table table = connection.getTable(tableName)) {
1090      Get get = createGetForBackupSet(name);
1091      Result res = table.get(get);
1092      if (res.isEmpty()) {
1093        union = newTables;
1094      } else {
1095        res.advance();
1096        String[] tables = cellValueToBackupSet(res.current());
1097        union = merge(tables, newTables);
1098      }
1099      Put put = createPutForBackupSet(name, union);
1100      table.put(put);
1101    }
1102  }
1103
1104  /**
1105   * Remove tables from backup set (list of tables)
1106   * @param name     set name
1107   * @param toRemove list of tables
1108   * @throws IOException if a table operation or deleting the backup set fails
1109   */
1110  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1111    if (LOG.isTraceEnabled()) {
1112      LOG.trace(
1113        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1114    }
1115    String[] disjoint;
1116    String[] tables;
1117    try (Table table = connection.getTable(tableName)) {
1118      Get get = createGetForBackupSet(name);
1119      Result res = table.get(get);
1120      if (res.isEmpty()) {
1121        LOG.warn("Backup set '" + name + "' not found.");
1122        return;
1123      } else {
1124        res.advance();
1125        tables = cellValueToBackupSet(res.current());
1126        disjoint = disjoin(tables, toRemove);
1127      }
1128      if (disjoint.length > 0 && disjoint.length != tables.length) {
1129        Put put = createPutForBackupSet(name, disjoint);
1130        table.put(put);
1131      } else if (disjoint.length == tables.length) {
1132        LOG.warn("Backup set '" + name + "' does not contain tables ["
1133          + StringUtils.join(toRemove, " ") + "]");
1134      } else { // disjoint.length == 0 and tables.length >0
1135        // Delete backup set
1136        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1137        deleteBackupSet(name);
1138      }
1139    }
1140  }
1141
1142  private String[] merge(String[] existingTables, String[] newTables) {
1143    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1144    tables.addAll(Arrays.asList(newTables));
1145    return tables.toArray(new String[0]);
1146  }
1147
1148  private String[] disjoin(String[] existingTables, String[] toRemove) {
1149    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1150    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1151    return tables.toArray(new String[0]);
1152  }
1153
1154  /**
1155   * Delete backup set
1156   * @param name set's name
1157   * @throws IOException if getting or deleting the table fails
1158   */
1159  public void deleteBackupSet(String name) throws IOException {
1160    if (LOG.isTraceEnabled()) {
1161      LOG.trace(" Backup set delete: " + name);
1162    }
1163    try (Table table = connection.getTable(tableName)) {
1164      Delete del = createDeleteForBackupSet(name);
1165      table.delete(del);
1166    }
1167  }
1168
1169  /**
1170   * Get backup system table descriptor
1171   * @return table's descriptor
1172   */
1173  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1174    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1175
1176    ColumnFamilyDescriptorBuilder colBuilder =
1177      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1178
1179    colBuilder.setMaxVersions(1);
1180    Configuration config = HBaseConfiguration.create();
1181    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1182      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1183    colBuilder.setTimeToLive(ttl);
1184
1185    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1186    builder.setColumnFamily(colSessionsDesc);
1187
1188    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1189    colBuilder.setTimeToLive(ttl);
1190    builder.setColumnFamily(colBuilder.build());
1191    return builder.build();
1192  }
1193
1194  public static TableName getTableName(Configuration conf) {
1195    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1196      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1197    return TableName.valueOf(name);
1198  }
1199
1200  public static String getTableNameAsString(Configuration conf) {
1201    return getTableName(conf).getNameAsString();
1202  }
1203
1204  public static String getSnapshotName(Configuration conf) {
1205    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1206  }
1207
1208  /**
1209   * Get backup system table descriptor
1210   * @return table's descriptor
1211   */
1212  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1213    TableDescriptorBuilder builder =
1214      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1215
1216    ColumnFamilyDescriptorBuilder colBuilder =
1217      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1218    colBuilder.setMaxVersions(1);
1219    Configuration config = HBaseConfiguration.create();
1220    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1221      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1222    colBuilder.setTimeToLive(ttl);
1223    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1224    builder.setColumnFamily(colSessionsDesc);
1225    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1226    colBuilder.setTimeToLive(ttl);
1227    builder.setColumnFamily(colBuilder.build());
1228    return builder.build();
1229  }
1230
1231  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1232    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1233      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1234    return TableName.valueOf(name);
1235  }
1236
1237  /**
1238   * Creates Put operation for a given backup info object
1239   * @param context backup info
1240   * @return put operation
1241   * @throws IOException exception
1242   */
1243  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1244    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1245    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1246      context.toByteArray());
1247    return put;
1248  }
1249
1250  /**
1251   * Creates Get operation for a given backup id
1252   * @param backupId backup's ID
1253   * @return get operation
1254   * @throws IOException exception
1255   */
1256  private Get createGetForBackupInfo(String backupId) throws IOException {
1257    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1258    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1259    get.readVersions(1);
1260    return get;
1261  }
1262
1263  /**
1264   * Creates Delete operation for a given backup id
1265   * @param backupId backup's ID
1266   * @return delete operation
1267   */
1268  private Delete createDeleteForBackupInfo(String backupId) {
1269    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1270    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1271    return del;
1272  }
1273
1274  /**
1275   * Converts Result to BackupInfo
1276   * @param res HBase result
1277   * @return backup info instance
1278   * @throws IOException exception
1279   */
1280  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1281    res.advance();
1282    Cell cell = res.current();
1283    return cellToBackupInfo(cell);
1284  }
1285
1286  /**
1287   * Creates Get operation to retrieve start code from backup system table
1288   * @return get operation
1289   * @throws IOException exception
1290   */
1291  private Get createGetForStartCode(String rootPath) throws IOException {
1292    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1293    get.addFamily(BackupSystemTable.META_FAMILY);
1294    get.readVersions(1);
1295    return get;
1296  }
1297
1298  /**
1299   * Creates Put operation to store start code to backup system table
1300   * @return put operation
1301   */
1302  private Put createPutForStartCode(String startCode, String rootPath) {
1303    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1304    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1305      Bytes.toBytes(startCode));
1306    return put;
1307  }
1308
1309  /**
1310   * Creates Get to retrieve incremental backup table set from backup system table
1311   * @return get operation
1312   * @throws IOException exception
1313   */
1314  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1315    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1316    get.addFamily(BackupSystemTable.META_FAMILY);
1317    get.readVersions(1);
1318    return get;
1319  }
1320
1321  /**
1322   * Creates Put to store incremental backup table set
1323   * @param tables tables
1324   * @return put operation
1325   */
1326  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1327    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1328    for (TableName table : tables) {
1329      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1330        EMPTY_VALUE);
1331    }
1332    return put;
1333  }
1334
1335  /**
1336   * Creates Delete for incremental backup table set
1337   * @param backupRoot backup root
1338   * @return delete operation
1339   */
1340  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1341    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1342    delete.addFamily(BackupSystemTable.META_FAMILY);
1343    return delete;
1344  }
1345
1346  /**
1347   * Creates Scan operation to load backup history
1348   * @return scan operation
1349   */
1350  private Scan createScanForBackupHistory() {
1351    Scan scan = new Scan();
1352    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1353    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1354    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1355    scan.withStartRow(startRow);
1356    scan.withStopRow(stopRow);
1357    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1358    scan.readVersions(1);
1359    return scan;
1360  }
1361
1362  /**
1363   * Converts cell to backup info instance.
1364   * @param current current cell
1365   * @return backup backup info instance
1366   * @throws IOException exception
1367   */
1368  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1369    byte[] data = CellUtil.cloneValue(current);
1370    return BackupInfo.fromByteArray(data);
1371  }
1372
1373  /**
1374   * Creates Put to write RS last roll log timestamp map
1375   * @param table table
1376   * @param smap  map, containing RS:ts
1377   * @return put operation
1378   */
1379  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1380    String backupRoot) {
1381    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1382    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1383    return put;
1384  }
1385
1386  /**
1387   * Creates Scan to load table-> { RS -> ts} map of maps
1388   * @return scan operation
1389   */
1390  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1391    Scan scan = new Scan();
1392    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1393    scan.addFamily(BackupSystemTable.META_FAMILY);
1394
1395    return scan;
1396  }
1397
1398  /**
1399   * Get table name from rowkey
1400   * @param cloneRow rowkey
1401   * @return table name
1402   */
1403  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1404    String s = Bytes.toString(cloneRow);
1405    int index = s.lastIndexOf(NULL);
1406    return s.substring(index + 1);
1407  }
1408
1409  /**
1410   * Creates Put to store RS last log result
1411   * @param server    server name
1412   * @param timestamp log roll result (timestamp)
1413   * @return put operation
1414   */
1415  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1416    String backupRoot) {
1417    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1418    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1419      Bytes.toBytes(timestamp));
1420    return put;
1421  }
1422
1423  /**
1424   * Creates Scan operation to load last RS log roll results
1425   * @return scan operation
1426   */
1427  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1428    Scan scan = new Scan();
1429    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1430    scan.addFamily(BackupSystemTable.META_FAMILY);
1431    scan.readVersions(1);
1432
1433    return scan;
1434  }
1435
1436  /**
1437   * Get server's name from rowkey
1438   * @param row rowkey
1439   * @return server's name
1440   */
1441  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1442    String s = Bytes.toString(row);
1443    int index = s.lastIndexOf(NULL);
1444    return s.substring(index + 1);
1445  }
1446
1447  /**
1448   * Creates Put's for bulk loads.
1449   */
1450  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1451    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1452    List<Put> puts = new ArrayList<>();
1453    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1454      for (Path path : entry.getValue()) {
1455        String file = path.toString();
1456        int lastSlash = file.lastIndexOf("/");
1457        String filename = file.substring(lastSlash + 1);
1458        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1459          Bytes.toString(region), BLK_LD_DELIM, filename));
1460        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1461        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1462        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1463        puts.add(put);
1464        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1465      }
1466    }
1467    return puts;
1468  }
1469
1470  public static void snapshot(Connection conn) throws IOException {
1471    try (Admin admin = conn.getAdmin()) {
1472      Configuration conf = conn.getConfiguration();
1473      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1474    }
1475  }
1476
1477  public static void restoreFromSnapshot(Connection conn) throws IOException {
1478    Configuration conf = conn.getConfiguration();
1479    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1480    try (Admin admin = conn.getAdmin()) {
1481      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1482      if (snapshotExists(admin, snapshotName)) {
1483        admin.disableTable(BackupSystemTable.getTableName(conf));
1484        admin.restoreSnapshot(snapshotName);
1485        admin.enableTable(BackupSystemTable.getTableName(conf));
1486        LOG.debug("Done restoring backup system table");
1487      } else {
1488        // Snapshot does not exists, i.e completeBackup failed after
1489        // deleting backup system table snapshot
1490        // In this case we log WARN and proceed
1491        LOG.warn(
1492          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1493      }
1494    }
1495  }
1496
1497  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1498    List<SnapshotDescription> list = admin.listSnapshots();
1499    for (SnapshotDescription desc : list) {
1500      if (desc.getName().equals(snapshotName)) {
1501        return true;
1502      }
1503    }
1504    return false;
1505  }
1506
1507  public static boolean snapshotExists(Connection conn) throws IOException {
1508    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1509  }
1510
1511  public static void deleteSnapshot(Connection conn) throws IOException {
1512    Configuration conf = conn.getConfiguration();
1513    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1514    try (Admin admin = conn.getAdmin()) {
1515      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1516      if (snapshotExists(admin, snapshotName)) {
1517        admin.deleteSnapshot(snapshotName);
1518        LOG.debug("Done deleting backup system table snapshot");
1519      } else {
1520        LOG.error("Snapshot " + snapshotName + " does not exists");
1521      }
1522    }
1523  }
1524
1525  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
1526    List<Delete> lstDels = new ArrayList<>(lst.size());
1527    for (TableName table : lst) {
1528      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
1529      del.addFamily(BackupSystemTable.META_FAMILY);
1530      lstDels.add(del);
1531    }
1532    return lstDels;
1533  }
1534
1535  private Put createPutForDeleteOperation(String[] backupIdList) {
1536    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1537    Put put = new Put(DELETE_OP_ROW);
1538    put.addColumn(META_FAMILY, FAM_COL, value);
1539    return put;
1540  }
1541
1542  private Delete createDeleteForBackupDeleteOperation() {
1543    Delete delete = new Delete(DELETE_OP_ROW);
1544    delete.addFamily(META_FAMILY);
1545    return delete;
1546  }
1547
1548  private Get createGetForDeleteOperation() {
1549    Get get = new Get(DELETE_OP_ROW);
1550    get.addFamily(META_FAMILY);
1551    return get;
1552  }
1553
1554  public void startDeleteOperation(String[] backupIdList) throws IOException {
1555    if (LOG.isTraceEnabled()) {
1556      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1557    }
1558    Put put = createPutForDeleteOperation(backupIdList);
1559    try (Table table = connection.getTable(tableName)) {
1560      table.put(put);
1561    }
1562  }
1563
1564  public void finishDeleteOperation() throws IOException {
1565    LOG.trace("Finsih delete operation for backup ids");
1566
1567    Delete delete = createDeleteForBackupDeleteOperation();
1568    try (Table table = connection.getTable(tableName)) {
1569      table.delete(delete);
1570    }
1571  }
1572
1573  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1574    LOG.trace("Get delete operation for backup ids");
1575
1576    Get get = createGetForDeleteOperation();
1577    try (Table table = connection.getTable(tableName)) {
1578      Result res = table.get(get);
1579      if (res.isEmpty()) {
1580        return null;
1581      }
1582      Cell cell = res.listCells().get(0);
1583      byte[] val = CellUtil.cloneValue(cell);
1584      if (val.length == 0) {
1585        return null;
1586      }
1587      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1588        .toArray(String[]::new);
1589    }
1590  }
1591
1592  private Put createPutForMergeOperation(String[] backupIdList) {
1593    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1594    Put put = new Put(MERGE_OP_ROW);
1595    put.addColumn(META_FAMILY, FAM_COL, value);
1596    return put;
1597  }
1598
1599  public boolean isMergeInProgress() throws IOException {
1600    Get get = new Get(MERGE_OP_ROW);
1601    try (Table table = connection.getTable(tableName)) {
1602      Result res = table.get(get);
1603      return !res.isEmpty();
1604    }
1605  }
1606
1607  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1608    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1609    Put put = new Put(MERGE_OP_ROW);
1610    put.addColumn(META_FAMILY, PATH_COL, value);
1611    return put;
1612  }
1613
1614  private Delete createDeleteForBackupMergeOperation() {
1615    Delete delete = new Delete(MERGE_OP_ROW);
1616    delete.addFamily(META_FAMILY);
1617    return delete;
1618  }
1619
1620  private Get createGetForMergeOperation() {
1621    Get get = new Get(MERGE_OP_ROW);
1622    get.addFamily(META_FAMILY);
1623    return get;
1624  }
1625
1626  public void startMergeOperation(String[] backupIdList) throws IOException {
1627    if (LOG.isTraceEnabled()) {
1628      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1629    }
1630    Put put = createPutForMergeOperation(backupIdList);
1631    try (Table table = connection.getTable(tableName)) {
1632      table.put(put);
1633    }
1634  }
1635
1636  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1637    if (LOG.isTraceEnabled()) {
1638      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1639    }
1640    Put put = createPutForUpdateTablesForMerge(tables);
1641    try (Table table = connection.getTable(tableName)) {
1642      table.put(put);
1643    }
1644  }
1645
1646  public void finishMergeOperation() throws IOException {
1647    LOG.trace("Finish merge operation for backup ids");
1648
1649    Delete delete = createDeleteForBackupMergeOperation();
1650    try (Table table = connection.getTable(tableName)) {
1651      table.delete(delete);
1652    }
1653  }
1654
1655  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1656    LOG.trace("Get backup ids for merge operation");
1657
1658    Get get = createGetForMergeOperation();
1659    try (Table table = connection.getTable(tableName)) {
1660      Result res = table.get(get);
1661      if (res.isEmpty()) {
1662        return null;
1663      }
1664      Cell cell = res.listCells().get(0);
1665      byte[] val = CellUtil.cloneValue(cell);
1666      if (val.length == 0) {
1667        return null;
1668      }
1669      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1670        .toArray(String[]::new);
1671    }
1672  }
1673
1674  static Scan createScanForOrigBulkLoadedFiles(TableName table) {
1675    Scan scan = new Scan();
1676    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1677    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1678    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1679    scan.withStartRow(startRow);
1680    scan.withStopRow(stopRow);
1681    scan.addFamily(BackupSystemTable.META_FAMILY);
1682    scan.readVersions(1);
1683    return scan;
1684  }
1685
1686  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1687    // format is bulk : namespace : table : region : file
1688    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1689  }
1690
1691  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1692    // format is bulk : namespace : table : region : file
1693    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1694    Iterator<String> i = parts.iterator();
1695    int idx = 3;
1696    if (parts.size() == 4) {
1697      // the table is in default namespace
1698      idx = 2;
1699    }
1700    String region = Iterators.get(i, idx);
1701    LOG.debug("bulk row string " + rowStr + " region " + region);
1702    return region;
1703  }
1704
1705  /*
1706   * Used to query bulk loaded hfiles which have been copied by incremental backup
1707   * @param backupId the backup Id. It can be null when querying for all tables
1708   * @return the Scan object
1709   */
1710  static Scan createScanForBulkLoadedFiles(String backupId) {
1711    Scan scan = new Scan();
1712    byte[] startRow =
1713      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1714    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1715    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1716    scan.withStartRow(startRow);
1717    scan.withStopRow(stopRow);
1718    scan.addFamily(BackupSystemTable.META_FAMILY);
1719    scan.readVersions(1);
1720    return scan;
1721  }
1722
1723  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
1724    long ts, int idx) {
1725    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
1726    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
1727    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
1728    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p));
1729    return put;
1730  }
1731
1732  /**
1733   * Creates Scan operation to load backup set list
1734   * @return scan operation
1735   */
1736  private Scan createScanForBackupSetList() {
1737    Scan scan = new Scan();
1738    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1739    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1740    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1741    scan.withStartRow(startRow);
1742    scan.withStopRow(stopRow);
1743    scan.addFamily(BackupSystemTable.META_FAMILY);
1744    return scan;
1745  }
1746
1747  /**
1748   * Creates Get operation to load backup set content
1749   * @return get operation
1750   */
1751  private Get createGetForBackupSet(String name) {
1752    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1753    get.addFamily(BackupSystemTable.META_FAMILY);
1754    return get;
1755  }
1756
1757  /**
1758   * Creates Delete operation to delete backup set content
1759   * @param name backup set's name
1760   * @return delete operation
1761   */
1762  private Delete createDeleteForBackupSet(String name) {
1763    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1764    del.addFamily(BackupSystemTable.META_FAMILY);
1765    return del;
1766  }
1767
1768  /**
1769   * Creates Put operation to update backup set content
1770   * @param name   backup set's name
1771   * @param tables list of tables
1772   * @return put operation
1773   */
1774  private Put createPutForBackupSet(String name, String[] tables) {
1775    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1776    byte[] value = convertToByteArray(tables);
1777    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1778    return put;
1779  }
1780
1781  private byte[] convertToByteArray(String[] tables) {
1782    return Bytes.toBytes(StringUtils.join(tables, ","));
1783  }
1784
1785  /**
1786   * Converts cell to backup set list.
1787   * @param current current cell
1788   * @return backup set as array of table names
1789   */
1790  private String[] cellValueToBackupSet(Cell current) {
1791    byte[] data = CellUtil.cloneValue(current);
1792    if (!ArrayUtils.isEmpty(data)) {
1793      return Bytes.toString(data).split(",");
1794    }
1795    return new String[0];
1796  }
1797
1798  /**
1799   * Converts cell key to backup set name.
1800   * @param current current cell
1801   * @return backup set name
1802   */
1803  private String cellKeyToBackupSetName(Cell current) {
1804    byte[] data = CellUtil.cloneRow(current);
1805    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1806  }
1807
1808  private static byte[] rowkey(String s, String... other) {
1809    StringBuilder sb = new StringBuilder(s);
1810    for (String ss : other) {
1811      sb.append(ss);
1812    }
1813    return Bytes.toBytes(sb.toString());
1814  }
1815
1816  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1817    if (!admin.isTableEnabled(tableName)) {
1818      try {
1819        admin.enableTable(tableName);
1820      } catch (TableNotDisabledException ignored) {
1821        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1822      }
1823    }
1824  }
1825}