001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
021
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import org.apache.commons.io.FilenameUtils;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.LocatedFileStatus;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.RemoteIterator;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.backup.BackupCopyJob;
038import org.apache.hadoop.hbase.backup.BackupInfo;
039import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
040import org.apache.hadoop.hbase.backup.BackupRequest;
041import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
042import org.apache.hadoop.hbase.backup.BackupType;
043import org.apache.hadoop.hbase.backup.HBackupFileSystem;
044import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
045import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
046import org.apache.hadoop.hbase.backup.util.BackupUtils;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.io.hfile.HFile;
051import org.apache.hadoop.hbase.mapreduce.WALPlayer;
052import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
053import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
054import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.HFileArchiveUtil;
057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
058import org.apache.hadoop.util.Tool;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
066
067/**
068 * Incremental backup implementation. See the {@link #execute() execute} method.
069 */
070@InterfaceAudience.Private
071public class IncrementalTableBackupClient extends TableBackupClient {
072  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class);
073
074  protected IncrementalTableBackupClient() {
075  }
076
077  public IncrementalTableBackupClient(final Connection conn, final String backupId,
078    BackupRequest request) throws IOException {
079    super(conn, backupId, request);
080  }
081
082  protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
083    List<String> list = new ArrayList<>();
084    for (String file : incrBackupFileList) {
085      Path p = new Path(file);
086      if (fs.exists(p) || isActiveWalPath(p)) {
087        list.add(file);
088      } else {
089        LOG.warn("Can't find file: " + file);
090      }
091    }
092    return list;
093  }
094
095  /**
096   * Check if a given path is belongs to active WAL directory
097   * @param p path
098   * @return true, if yes
099   */
100  protected boolean isActiveWalPath(Path p) {
101    return !AbstractFSWALProvider.isArchivedLogFile(p);
102  }
103
104  protected static int getIndex(TableName tbl, List<TableName> sTableList) {
105    if (sTableList == null) {
106      return 0;
107    }
108
109    for (int i = 0; i < sTableList.size(); i++) {
110      if (tbl.equals(sTableList.get(i))) {
111        return i;
112      }
113    }
114    return -1;
115  }
116
117  /**
118   * Reads bulk load records from backup table, iterates through the records and forms the paths for
119   * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. This method does NOT
120   * clean up the entries in the bulk load system table. Those entries should not be cleaned until
121   * the backup is marked as complete.
122   * @param tablesToBackup list of tables to be backed up
123   */
124  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
125    Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
126    List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
127    FileSystem tgtFs;
128    try {
129      tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
130    } catch (URISyntaxException use) {
131      throw new IOException("Unable to get FileSystem", use);
132    }
133    Path rootdir = CommonFSUtils.getRootDir(conf);
134    Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
135
136    for (BulkLoad bulkLoad : bulkLoads) {
137      TableName srcTable = bulkLoad.getTableName();
138      MergeSplitBulkloadInfo bulkloadInfo =
139        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
140      String regionName = bulkLoad.getRegion();
141      String fam = bulkLoad.getColumnFamily();
142      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
143
144      if (!tablesToBackup.contains(srcTable)) {
145        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
146        continue;
147      }
148      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
149      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename);
150
151      String srcTableQualifier = srcTable.getQualifierAsString();
152      String srcTableNs = srcTable.getNamespaceAsString();
153      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier
154        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
155      if (!tgtFs.mkdirs(tgtFam)) {
156        throw new IOException("couldn't create " + tgtFam);
157      }
158      Path tgt = new Path(tgtFam, filename);
159
160      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
161      Path archive = new Path(archiveDir, filename);
162
163      if (fs.exists(p)) {
164        if (LOG.isTraceEnabled()) {
165          LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(),
166            srcTableQualifier);
167          LOG.trace("copying {} to {}", p, tgt);
168        }
169        bulkloadInfo.addActiveFile(p.toString());
170      } else if (fs.exists(archive)) {
171        LOG.debug("copying archive {} to {}", archive, tgt);
172        bulkloadInfo.addArchiveFiles(archive.toString());
173      }
174    }
175
176    for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
177      mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
178        bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
179    }
180
181    return bulkLoads;
182  }
183
184  private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
185    List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws IOException {
186    int attempt = 1;
187
188    while (!activeFiles.isEmpty()) {
189      LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", activeFiles.size(), attempt++);
190      // Active file can be archived during copy operation,
191      // we need to handle this properly
192      try {
193        mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs);
194        break;
195      } catch (IOException e) {
196        int numActiveFiles = activeFiles.size();
197        updateFileLists(activeFiles, archiveFiles);
198        if (activeFiles.size() < numActiveFiles) {
199          continue;
200        }
201
202        throw e;
203      }
204    }
205
206    if (!archiveFiles.isEmpty()) {
207      mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs);
208    }
209  }
210
211  private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs)
212    throws IOException {
213    MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
214    conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
215      getBulkOutputDirForTable(tn).toString());
216    player.setConf(conf);
217
218    String inputDirs = StringUtils.join(files, ",");
219    String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() };
220
221    int result;
222
223    try {
224      result = player.run(args);
225    } catch (Exception e) {
226      LOG.error("Failed to run MapReduceHFileSplitterJob", e);
227      // Delete the bulkload directory if we fail to run the HFile splitter job for any reason
228      // as it might be re-tried
229      deleteBulkLoadDirectory();
230      throw new IOException(e);
231    }
232
233    if (result != 0) {
234      throw new IOException(
235        "Failed to run MapReduceHFileSplitterJob with invalid result: " + result);
236    }
237
238    incrementalCopyBulkloadHFiles(tgtFs, tn);
239  }
240
241  private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
242    throws IOException {
243    List<String> newlyArchived = new ArrayList<>();
244
245    for (String spath : activeFiles) {
246      if (!fs.exists(new Path(spath))) {
247        newlyArchived.add(spath);
248      }
249    }
250
251    if (newlyArchived.size() > 0) {
252      activeFiles.removeAll(newlyArchived);
253      archiveFiles.addAll(newlyArchived);
254    }
255
256    LOG.debug(newlyArchived.size() + " files have been archived.");
257  }
258
259  /**
260   * @throws IOException                   If the execution of the backup fails
261   * @throws ColumnFamilyMismatchException If the column families of the current table do not match
262   *                                       the column families for the last full backup. In which
263   *                                       case, a full backup should be taken
264   */
265  @Override
266  public void execute() throws IOException, ColumnFamilyMismatchException {
267    try {
268      Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
269      verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
270
271      // case PREPARE_INCREMENTAL:
272      beginBackup(backupManager, backupInfo);
273      backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
274      LOG.debug("For incremental backup, current table set is "
275        + backupManager.getIncrementalBackupTableSet());
276      newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
277    } catch (Exception e) {
278      // fail the overall backup and return
279      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
280        BackupType.INCREMENTAL, conf);
281      throw new IOException(e);
282    }
283
284    // case INCREMENTAL_COPY:
285    try {
286      // copy out the table and region info files for each table
287      BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
288      setupRegionLocator();
289      // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
290      convertWALsToHFiles();
291      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
292        backupInfo.getBackupRootDir());
293    } catch (Exception e) {
294      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
295      // fail the overall backup and return
296      failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
297      throw new IOException(e);
298    }
299    // case INCR_BACKUP_COMPLETE:
300    // set overall backup status: complete. Here we make sure to complete the backup.
301    // After this checkpoint, even if entering cancel process, will let the backup finished
302    try {
303      // Set the previousTimestampMap which is before this current log roll to the manifest.
304      Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
305      backupInfo.setIncrTimestampMap(previousTimestampMap);
306
307      // The table list in backupInfo is good for both full backup and incremental backup.
308      // For incremental backup, it contains the incremental backup table set.
309      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
310
311      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
312        backupManager.readLogTimestampMap();
313
314      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
315      Long newStartCode =
316        BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
317      backupManager.writeBackupStartCode(newStartCode);
318
319      List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
320
321      // backup complete
322      completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
323
324      List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey);
325      backupManager.deleteBulkLoadedRows(bulkLoadedRows);
326    } catch (IOException e) {
327      failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
328        BackupType.INCREMENTAL, conf);
329      throw new IOException(e);
330    }
331  }
332
333  protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
334    try {
335      LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
336      // set overall backup phase: incremental_copy
337      backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
338      // get incremental backup file list and prepare parms for DistCp
339      String[] strArr = new String[files.length + 1];
340      System.arraycopy(files, 0, strArr, 0, files.length);
341      strArr[strArr.length - 1] = backupDest;
342
343      String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId();
344      if (LOG.isDebugEnabled()) {
345        LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
346      }
347      conf.set(JOB_NAME_CONF_KEY, jobname);
348
349      BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
350      int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
351      if (res != 0) {
352        LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
353        throw new IOException(
354          "Failed copy from " + StringUtils.join(files, ',') + " to " + backupDest);
355      }
356      LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + " to " + backupDest
357        + " finished.");
358    } finally {
359      deleteBulkLoadDirectory();
360    }
361  }
362
363  protected void deleteBulkLoadDirectory() throws IOException {
364    // delete original bulk load directory on method exit
365    Path path = getBulkOutputDir();
366    FileSystem fs = FileSystem.get(path.toUri(), conf);
367    boolean result = fs.delete(path, true);
368    if (!result) {
369      LOG.warn("Could not delete " + path);
370    }
371  }
372
373  protected void convertWALsToHFiles() throws IOException {
374    // get incremental backup file list and prepare parameters for DistCp
375    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
376    // Get list of tables in incremental backup set
377    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
378    // filter missing files out (they have been copied by previous backups)
379    incrBackupFileList = filterMissingFiles(incrBackupFileList);
380    List<String> tableList = new ArrayList<String>();
381    for (TableName table : tableSet) {
382      // Check if table exists
383      if (tableExists(table, conn)) {
384        tableList.add(table.getNameAsString());
385      } else {
386        LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
387      }
388    }
389    walToHFiles(incrBackupFileList, tableList);
390
391  }
392
393  protected boolean tableExists(TableName table, Connection conn) throws IOException {
394    try (Admin admin = conn.getAdmin()) {
395      return admin.tableExists(table);
396    }
397  }
398
399  protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
400    Tool player = new WALPlayer();
401
402    // Player reads all files in arbitrary directory structure and creates
403    // a Map task for each file. We use ';' as separator
404    // because WAL file names contains ','
405    String dirs = StringUtils.join(dirPaths, ';');
406    String jobname = "Incremental_Backup-" + backupId;
407
408    Path bulkOutputPath = getBulkOutputDir();
409    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
410    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
411    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
412    conf.set(JOB_NAME_CONF_KEY, jobname);
413    String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
414
415    try {
416      player.setConf(conf);
417      int result = player.run(playerArgs);
418      if (result != 0) {
419        throw new IOException("WAL Player failed");
420      }
421      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
422      conf.unset(JOB_NAME_CONF_KEY);
423    } catch (IOException e) {
424      throw e;
425    } catch (Exception ee) {
426      throw new IOException("Can not convert from directory " + dirs
427        + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
428    }
429  }
430
431  private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException {
432    Path bulkOutDir = getBulkOutputDirForTable(tn);
433
434    if (tgtFs.exists(bulkOutDir)) {
435      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
436      Path tgtPath = getTargetDirForTable(tn);
437      try {
438        RemoteIterator<LocatedFileStatus> locatedFiles = tgtFs.listFiles(bulkOutDir, true);
439        List<String> files = new ArrayList<>();
440        while (locatedFiles.hasNext()) {
441          LocatedFileStatus file = locatedFiles.next();
442          if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
443            files.add(file.getPath().toString());
444          }
445        }
446        incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString());
447      } finally {
448        conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
449      }
450    }
451  }
452
453  protected Path getBulkOutputDirForTable(TableName table) {
454    Path tablePath = getBulkOutputDir();
455    tablePath = new Path(tablePath, table.getNamespaceAsString());
456    tablePath = new Path(tablePath, table.getQualifierAsString());
457    return new Path(tablePath, "data");
458  }
459
460  protected Path getBulkOutputDir() {
461    String backupId = backupInfo.getBackupId();
462    Path path = new Path(backupInfo.getBackupRootDir());
463    path = new Path(path, ".tmp");
464    path = new Path(path, backupId);
465    return path;
466  }
467
468  private Path getTargetDirForTable(TableName table) {
469    Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId());
470    path = new Path(path, table.getNamespaceAsString());
471    path = new Path(path, table.getNameAsString());
472    return path;
473  }
474
475  private void setupRegionLocator() throws IOException {
476    Map<TableName, String> fullBackupIds = getFullBackupIds();
477    try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
478
479      for (TableName tableName : backupInfo.getTables()) {
480        String fullBackupId = fullBackupIds.get(tableName);
481        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId);
482        String snapshotName = fullBackupInfo.getSnapshotName(tableName);
483        Path root = HBackupFileSystem.getTableBackupPath(tableName,
484          new Path(fullBackupInfo.getBackupRootDir()), fullBackupId);
485        String manifestDir =
486          SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root).toString();
487        SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, tableName);
488      }
489    }
490  }
491
492  private Map<TableName, String> getFullBackupIds() throws IOException {
493    // Ancestors are stored from newest to oldest, so we can iterate backwards
494    // in order to populate our backupId map with the most recent full backup
495    // for a given table
496    List<BackupManifest.BackupImage> images = getAncestors(backupInfo);
497    Map<TableName, String> results = new HashMap<>();
498    for (int i = images.size() - 1; i >= 0; i--) {
499      BackupManifest.BackupImage image = images.get(i);
500      if (image.getType() != BackupType.FULL) {
501        continue;
502      }
503
504      for (TableName tn : image.getTableNames()) {
505        results.put(tn, image.getBackupId());
506      }
507    }
508    return results;
509  }
510
511  /**
512   * Verifies that the current table descriptor CFs matches the descriptor CFs of the last full
513   * backup for the tables. This ensures CF compatibility across incremental backups. If a mismatch
514   * is detected, a full table backup should be taken, rather than an incremental one
515   */
516  private void verifyCfCompatibility(Set<TableName> tables,
517    Map<TableName, String> tablesToFullBackupId) throws IOException, ColumnFamilyMismatchException {
518    ColumnFamilyMismatchException.ColumnFamilyMismatchExceptionBuilder exBuilder =
519      ColumnFamilyMismatchException.newBuilder();
520    try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
521      for (TableName tn : tables) {
522        String backupId = tablesToFullBackupId.get(tn);
523        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(backupId);
524
525        ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies();
526        String snapshotName = fullBackupInfo.getSnapshotName(tn);
527        Path root = HBackupFileSystem.getTableBackupPath(tn,
528          new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId());
529        Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root);
530
531        FileSystem fs;
532        try {
533          fs = FileSystem.get(new URI(fullBackupInfo.getBackupRootDir()), conf);
534        } catch (URISyntaxException e) {
535          throw new IOException("Unable to get fs for backup " + fullBackupInfo.getBackupId(), e);
536        }
537
538        SnapshotProtos.SnapshotDescription snapshotDescription =
539          SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir);
540        SnapshotManifest manifest =
541          SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription);
542
543        ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies();
544        if (!areCfsCompatible(currentCfs, backupCfs)) {
545          exBuilder.addMismatchedTable(tn, currentCfs, backupCfs);
546        }
547      }
548    }
549
550    ColumnFamilyMismatchException ex = exBuilder.build();
551    if (!ex.getMismatchedTables().isEmpty()) {
552      throw ex;
553    }
554  }
555
556  private static boolean areCfsCompatible(ColumnFamilyDescriptor[] currentCfs,
557    ColumnFamilyDescriptor[] backupCfs) {
558    if (currentCfs.length != backupCfs.length) {
559      return false;
560    }
561
562    for (int i = 0; i < backupCfs.length; i++) {
563      String currentCf = currentCfs[i].getNameAsString();
564      String backupCf = backupCfs[i].getNameAsString();
565
566      if (!currentCf.equals(backupCf)) {
567        return false;
568      }
569    }
570
571    return true;
572  }
573}