001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.impl; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.charset.StandardCharsets; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.Objects; 034import java.util.Set; 035import java.util.TreeMap; 036import java.util.TreeSet; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.ArrayUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.CellUtil; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.NamespaceDescriptor; 046import org.apache.hadoop.hbase.NamespaceExistException; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableExistsException; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotDisabledException; 051import org.apache.hadoop.hbase.backup.BackupInfo; 052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 053import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 054import org.apache.hadoop.hbase.backup.BackupType; 055import org.apache.hadoop.hbase.backup.util.BackupUtils; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.Delete; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.SnapshotDescription; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.yetus.audience.InterfaceAudience; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 078import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 079 080import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 082 083/** 084 * This class provides API to access backup system table<br> 085 * Backup system table schema:<br> 086 * <p> 087 * <ul> 088 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> 089 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> 090 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of 091 * include table; value=empty</li> 092 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL 093 * timestamp]</li> 094 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> 095 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file 096 * name</li> 097 * </ul> 098 * </p> 099 */ 100@InterfaceAudience.Private 101public final class BackupSystemTable implements Closeable { 102 103 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); 104 105 static class WALItem { 106 String backupId; 107 String walFile; 108 String backupRoot; 109 110 WALItem(String backupId, String walFile, String backupRoot) { 111 this.backupId = backupId; 112 this.walFile = walFile; 113 this.backupRoot = backupRoot; 114 } 115 116 public String getBackupId() { 117 return backupId; 118 } 119 120 public String getWalFile() { 121 return walFile; 122 } 123 124 public String getBackupRoot() { 125 return backupRoot; 126 } 127 128 @Override 129 public String toString() { 130 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; 131 } 132 } 133 134 /** 135 * Backup system table (main) name 136 */ 137 private TableName tableName; 138 139 /** 140 * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a 141 * separate table because we have to isolate general backup operations: create, merge etc from 142 * activity of RegionObserver, which controls process of a bulk loading 143 * {@link org.apache.hadoop.hbase.backup.BackupObserver} 144 */ 145 private TableName bulkLoadTableName; 146 147 /** 148 * Stores backup sessions (contexts) 149 */ 150 final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); 151 /** 152 * Stores other meta 153 */ 154 final static byte[] META_FAMILY = Bytes.toBytes("meta"); 155 final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); 156 /** 157 * Connection to HBase cluster, shared among all instances 158 */ 159 private final Connection connection; 160 161 private final static String BACKUP_INFO_PREFIX = "session:"; 162 private final static String START_CODE_ROW = "startcode:"; 163 private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); 164 private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); 165 166 private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); 167 private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); 168 169 private final static String INCR_BACKUP_SET = "incrbackupset:"; 170 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; 171 private final static String RS_LOG_TS_PREFIX = "rslogts:"; 172 173 private final static String BULK_LOAD_PREFIX = "bulk:"; 174 private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX); 175 private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row"); 176 private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row"); 177 178 final static byte[] TBL_COL = Bytes.toBytes("tbl"); 179 final static byte[] FAM_COL = Bytes.toBytes("fam"); 180 final static byte[] PATH_COL = Bytes.toBytes("path"); 181 182 private final static String SET_KEY_PREFIX = "backupset:"; 183 184 // separator between BULK_LOAD_PREFIX and ordinals 185 private final static String BLK_LD_DELIM = ":"; 186 private final static byte[] EMPTY_VALUE = new byte[] {}; 187 188 // Safe delimiter in a string 189 private final static String NULL = "\u0000"; 190 191 public BackupSystemTable(Connection conn) throws IOException { 192 this.connection = conn; 193 Configuration conf = this.connection.getConfiguration(); 194 tableName = BackupSystemTable.getTableName(conf); 195 bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); 196 checkSystemTable(); 197 } 198 199 private void checkSystemTable() throws IOException { 200 try (Admin admin = connection.getAdmin()) { 201 verifyNamespaceExists(admin); 202 Configuration conf = connection.getConfiguration(); 203 if (!admin.tableExists(tableName)) { 204 TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); 205 createSystemTable(admin, backupHTD); 206 } 207 ensureTableEnabled(admin, tableName); 208 if (!admin.tableExists(bulkLoadTableName)) { 209 TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); 210 createSystemTable(admin, blHTD); 211 } 212 ensureTableEnabled(admin, bulkLoadTableName); 213 waitForSystemTable(admin, tableName); 214 waitForSystemTable(admin, bulkLoadTableName); 215 } 216 } 217 218 private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException { 219 try { 220 admin.createTable(descriptor); 221 } catch (TableExistsException e) { 222 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 223 // so may be subject to race conditions where one caller succeeds in creating the 224 // table and others fail because it now exists 225 LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e); 226 } 227 } 228 229 private void verifyNamespaceExists(Admin admin) throws IOException { 230 String namespaceName = tableName.getNamespaceAsString(); 231 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); 232 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); 233 boolean exists = false; 234 for (NamespaceDescriptor nsd : list) { 235 if (nsd.getName().equals(ns.getName())) { 236 exists = true; 237 break; 238 } 239 } 240 if (!exists) { 241 try { 242 admin.createNamespace(ns); 243 } catch (NamespaceExistException e) { 244 // swallow because this class is initialized in concurrent environments (i.e. bulkloads), 245 // so may be subject to race conditions where one caller succeeds in creating the 246 // namespace and others fail because it now exists 247 LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e); 248 } 249 } 250 } 251 252 private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { 253 // Return fast if the table is available and avoid a log message 254 if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) { 255 return; 256 } 257 long TIMEOUT = 60000; 258 long startTime = EnvironmentEdgeManager.currentTime(); 259 LOG.debug("Backup table {} is not present and available, waiting for it to become so", 260 tableName); 261 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { 262 try { 263 Thread.sleep(100); 264 } catch (InterruptedException e) { 265 throw (IOException) new InterruptedIOException().initCause(e); 266 } 267 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { 268 throw new IOException( 269 "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); 270 } 271 } 272 LOG.debug("Backup table {} exists and available", tableName); 273 } 274 275 @Override 276 public void close() { 277 // do nothing 278 } 279 280 /** 281 * Updates status (state) of a backup session in backup system table table 282 * @param info backup info 283 * @throws IOException exception 284 */ 285 public void updateBackupInfo(BackupInfo info) throws IOException { 286 if (LOG.isTraceEnabled()) { 287 LOG.trace("update backup status in backup system table for: " + info.getBackupId() 288 + " set status=" + info.getState()); 289 } 290 try (Table table = connection.getTable(tableName)) { 291 Put put = createPutForBackupInfo(info); 292 table.put(put); 293 } 294 } 295 296 /* 297 * @param backupId the backup Id 298 * @return Map of rows to path of bulk loaded hfile 299 */ 300 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { 301 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 302 try (Table table = connection.getTable(bulkLoadTableName); 303 ResultScanner scanner = table.getScanner(scan)) { 304 Result res = null; 305 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 306 while ((res = scanner.next()) != null) { 307 res.advance(); 308 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); 309 for (Cell cell : res.listCells()) { 310 if ( 311 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 312 BackupSystemTable.PATH_COL.length) == 0 313 ) { 314 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); 315 } 316 } 317 } 318 return map; 319 } 320 } 321 322 /* 323 * Used during restore 324 * @param backupId the backup Id 325 * @param sTableList List of tables 326 * @return array of Map of family to List of Paths 327 */ 328 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) 329 throws IOException { 330 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); 331 @SuppressWarnings("unchecked") 332 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; 333 try (Table table = connection.getTable(bulkLoadTableName); 334 ResultScanner scanner = table.getScanner(scan)) { 335 Result res = null; 336 while ((res = scanner.next()) != null) { 337 res.advance(); 338 TableName tbl = null; 339 byte[] fam = null; 340 String path = null; 341 for (Cell cell : res.listCells()) { 342 if ( 343 CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, 344 BackupSystemTable.TBL_COL.length) == 0 345 ) { 346 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); 347 } else if ( 348 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 349 BackupSystemTable.FAM_COL.length) == 0 350 ) { 351 fam = CellUtil.cloneValue(cell); 352 } else if ( 353 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 354 BackupSystemTable.PATH_COL.length) == 0 355 ) { 356 path = Bytes.toString(CellUtil.cloneValue(cell)); 357 } 358 } 359 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); 360 if (srcIdx == -1) { 361 // the table is not among the query 362 continue; 363 } 364 if (mapForSrc[srcIdx] == null) { 365 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); 366 } 367 List<Path> files; 368 if (!mapForSrc[srcIdx].containsKey(fam)) { 369 files = new ArrayList<Path>(); 370 mapForSrc[srcIdx].put(fam, files); 371 } else { 372 files = mapForSrc[srcIdx].get(fam); 373 } 374 files.add(new Path(path)); 375 if (LOG.isDebugEnabled()) { 376 LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path); 377 } 378 } 379 380 return mapForSrc; 381 } 382 } 383 384 /** 385 * Deletes backup status from backup system table table 386 * @param backupId backup id 387 * @throws IOException exception 388 */ 389 public void deleteBackupInfo(String backupId) throws IOException { 390 if (LOG.isTraceEnabled()) { 391 LOG.trace("delete backup status in backup system table for " + backupId); 392 } 393 try (Table table = connection.getTable(tableName)) { 394 Delete del = createDeleteForBackupInfo(backupId); 395 table.delete(del); 396 } 397 } 398 399 /** 400 * Registers a bulk load. 401 * @param tableName table name 402 * @param region the region receiving hfile 403 * @param cfToHfilePath column family and associated hfiles 404 */ 405 public void registerBulkLoad(TableName tableName, byte[] region, 406 Map<byte[], List<Path>> cfToHfilePath) throws IOException { 407 if (LOG.isDebugEnabled()) { 408 LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName, 409 cfToHfilePath.size()); 410 } 411 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 412 List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath); 413 bufferedMutator.mutate(puts); 414 LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName); 415 } 416 } 417 418 /* 419 * Removes rows recording bulk loaded hfiles from backup table 420 * @param lst list of table names 421 * @param rows the rows to be deleted 422 */ 423 public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException { 424 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 425 List<Delete> lstDels = new ArrayList<>(); 426 for (byte[] row : rows) { 427 Delete del = new Delete(row); 428 lstDels.add(del); 429 LOG.debug("orig deleting the row: " + Bytes.toString(row)); 430 } 431 bufferedMutator.mutate(lstDels); 432 LOG.debug("deleted " + rows.size() + " original bulkload rows"); 433 } 434 } 435 436 /** 437 * Reads the rows from backup table recording bulk loaded hfiles 438 * @param tableList list of table names 439 */ 440 public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException { 441 List<BulkLoad> result = new ArrayList<>(); 442 for (TableName table : tableList) { 443 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); 444 try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); 445 ResultScanner scanner = bulkLoadTable.getScanner(scan)) { 446 Result res; 447 while ((res = scanner.next()) != null) { 448 res.advance(); 449 String fam = null; 450 String path = null; 451 String region = null; 452 byte[] row = null; 453 for (Cell cell : res.listCells()) { 454 row = CellUtil.cloneRow(cell); 455 String rowStr = Bytes.toString(row); 456 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); 457 if ( 458 CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, 459 BackupSystemTable.FAM_COL.length) == 0 460 ) { 461 fam = Bytes.toString(CellUtil.cloneValue(cell)); 462 } else if ( 463 CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, 464 BackupSystemTable.PATH_COL.length) == 0 465 ) { 466 path = Bytes.toString(CellUtil.cloneValue(cell)); 467 } 468 } 469 result.add(new BulkLoad(table, region, fam, path, row)); 470 LOG.debug("found orig " + path + " for " + fam + " of table " + region); 471 } 472 } 473 } 474 return result; 475 } 476 477 /* 478 * @param sTableList List of tables 479 * @param maps array of Map of family to List of Paths 480 * @param backupId the backup Id 481 */ 482 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, 483 String backupId) throws IOException { 484 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) { 485 long ts = EnvironmentEdgeManager.currentTime(); 486 int cnt = 0; 487 List<Put> puts = new ArrayList<>(); 488 for (int idx = 0; idx < maps.length; idx++) { 489 Map<byte[], List<Path>> map = maps[idx]; 490 TableName tn = sTableList.get(idx); 491 492 if (map == null) { 493 continue; 494 } 495 496 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { 497 byte[] fam = entry.getKey(); 498 List<Path> paths = entry.getValue(); 499 for (Path p : paths) { 500 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, 501 ts, cnt++); 502 puts.add(put); 503 } 504 } 505 } 506 if (!puts.isEmpty()) { 507 bufferedMutator.mutate(puts); 508 } 509 } 510 } 511 512 /** 513 * Reads backup status object (instance of backup info) from backup system table table 514 * @param backupId backup id 515 * @return Current status of backup session or null 516 */ 517 public BackupInfo readBackupInfo(String backupId) throws IOException { 518 if (LOG.isTraceEnabled()) { 519 LOG.trace("read backup status from backup system table for: " + backupId); 520 } 521 522 try (Table table = connection.getTable(tableName)) { 523 Get get = createGetForBackupInfo(backupId); 524 Result res = table.get(get); 525 if (res.isEmpty()) { 526 return null; 527 } 528 return resultToBackupInfo(res); 529 } 530 } 531 532 /** 533 * Read the last backup start code (timestamp) of last successful backup. Will return null if 534 * there is no start code stored on hbase or the value is of length 0. These two cases indicate 535 * there is no successful backup completed so far. 536 * @param backupRoot directory path to backup destination 537 * @return the timestamp of last successful backup 538 * @throws IOException exception 539 */ 540 public String readBackupStartCode(String backupRoot) throws IOException { 541 LOG.trace("read backup start code from backup system table"); 542 543 try (Table table = connection.getTable(tableName)) { 544 Get get = createGetForStartCode(backupRoot); 545 Result res = table.get(get); 546 if (res.isEmpty()) { 547 return null; 548 } 549 Cell cell = res.listCells().get(0); 550 byte[] val = CellUtil.cloneValue(cell); 551 if (val.length == 0) { 552 return null; 553 } 554 return new String(val, StandardCharsets.UTF_8); 555 } 556 } 557 558 /** 559 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. 560 * @param startCode start code 561 * @param backupRoot root directory path to backup 562 * @throws IOException exception 563 */ 564 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { 565 if (LOG.isTraceEnabled()) { 566 LOG.trace("write backup start code to backup system table " + startCode); 567 } 568 try (Table table = connection.getTable(tableName)) { 569 Put put = createPutForStartCode(startCode.toString(), backupRoot); 570 table.put(put); 571 } 572 } 573 574 /** 575 * Exclusive operations are: create, delete, merge 576 * @throws IOException if a table operation fails or an active backup exclusive operation is 577 * already underway 578 */ 579 public void startBackupExclusiveOperation() throws IOException { 580 LOG.debug("Start new backup exclusive operation"); 581 582 try (Table table = connection.getTable(tableName)) { 583 Put put = createPutForStartBackupSession(); 584 // First try to put if row does not exist 585 if ( 586 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 587 .ifNotExists().thenPut(put) 588 ) { 589 // Row exists, try to put if value == ACTIVE_SESSION_NO 590 if ( 591 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 592 .ifEquals(ACTIVE_SESSION_NO).thenPut(put) 593 ) { 594 throw new ExclusiveOperationException(); 595 } 596 } 597 } 598 } 599 600 private Put createPutForStartBackupSession() { 601 Put put = new Put(ACTIVE_SESSION_ROW); 602 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); 603 return put; 604 } 605 606 public void finishBackupExclusiveOperation() throws IOException { 607 LOG.debug("Finish backup exclusive operation"); 608 609 try (Table table = connection.getTable(tableName)) { 610 Put put = createPutForStopBackupSession(); 611 if ( 612 !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) 613 .ifEquals(ACTIVE_SESSION_YES).thenPut(put) 614 ) { 615 throw new IOException("There is no active backup exclusive operation"); 616 } 617 } 618 } 619 620 private Put createPutForStopBackupSession() { 621 Put put = new Put(ACTIVE_SESSION_ROW); 622 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); 623 return put; 624 } 625 626 /** 627 * Get the Region Servers log information after the last log roll from backup system table. 628 * @param backupRoot root directory path to backup 629 * @return RS log info 630 * @throws IOException exception 631 */ 632 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) 633 throws IOException { 634 LOG.trace("read region server last roll log result to backup system table"); 635 636 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); 637 638 try (Table table = connection.getTable(tableName); 639 ResultScanner scanner = table.getScanner(scan)) { 640 Result res; 641 HashMap<String, Long> rsTimestampMap = new HashMap<>(); 642 while ((res = scanner.next()) != null) { 643 res.advance(); 644 Cell cell = res.current(); 645 byte[] row = CellUtil.cloneRow(cell); 646 String server = getServerNameForReadRegionServerLastLogRollResult(row); 647 byte[] data = CellUtil.cloneValue(cell); 648 rsTimestampMap.put(server, Bytes.toLong(data)); 649 } 650 return rsTimestampMap; 651 } 652 } 653 654 /** 655 * Writes Region Server last roll log result (timestamp) to backup system table table 656 * @param server Region Server name 657 * @param ts last log timestamp 658 * @param backupRoot root directory path to backup 659 * @throws IOException exception 660 */ 661 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) 662 throws IOException { 663 LOG.trace("write region server last roll log result to backup system table"); 664 665 try (Table table = connection.getTable(tableName)) { 666 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); 667 table.put(put); 668 } 669 } 670 671 /** 672 * Get all completed backup information (in desc order by time) 673 * @param onlyCompleted true, if only successfully completed sessions 674 * @return history info of BackupCompleteData 675 * @throws IOException exception 676 */ 677 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { 678 LOG.trace("get backup history from backup system table"); 679 680 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; 681 ArrayList<BackupInfo> list = getBackupInfos(state); 682 return BackupUtils.sortHistoryListDesc(list); 683 } 684 685 /** 686 * Get all backups history 687 * @return list of backup info 688 * @throws IOException if getting the backup history fails 689 */ 690 public List<BackupInfo> getBackupHistory() throws IOException { 691 return getBackupHistory(false); 692 } 693 694 /** 695 * Get first n backup history records 696 * @param n number of records, if n== -1 - max number is ignored 697 * @return list of records 698 * @throws IOException if getting the backup history fails 699 */ 700 public List<BackupInfo> getHistory(int n) throws IOException { 701 List<BackupInfo> history = getBackupHistory(); 702 if (n == -1 || history.size() <= n) { 703 return history; 704 } 705 return Collections.unmodifiableList(history.subList(0, n)); 706 } 707 708 /** 709 * Get backup history records filtered by list of filters. 710 * @param n max number of records, if n == -1 , then max number is ignored 711 * @param filters list of filters 712 * @return backup records 713 * @throws IOException if getting the backup history fails 714 */ 715 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { 716 if (filters.length == 0) { 717 return getHistory(n); 718 } 719 720 List<BackupInfo> history = getBackupHistory(); 721 List<BackupInfo> result = new ArrayList<>(); 722 for (BackupInfo bi : history) { 723 if (n >= 0 && result.size() == n) { 724 break; 725 } 726 727 boolean passed = true; 728 for (int i = 0; i < filters.length; i++) { 729 if (!filters[i].apply(bi)) { 730 passed = false; 731 break; 732 } 733 } 734 if (passed) { 735 result.add(bi); 736 } 737 } 738 return result; 739 } 740 741 /** 742 * Retrieve all table names that are part of any known backup 743 */ 744 public Set<TableName> getTablesIncludedInBackups() throws IOException { 745 Set<TableName> names = new HashSet<>(); 746 List<BackupInfo> infos = getBackupHistory(true); 747 for (BackupInfo info : infos) { 748 // Incremental backups have the same tables as the preceding full backups 749 if (info.getType() == BackupType.FULL) { 750 names.addAll(info.getTableNames()); 751 } 752 } 753 return names; 754 } 755 756 /** 757 * Get history for backup destination 758 * @param backupRoot backup destination path 759 * @return List of backup info 760 * @throws IOException if getting the backup history fails 761 */ 762 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { 763 ArrayList<BackupInfo> history = getBackupHistory(false); 764 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { 765 BackupInfo info = iterator.next(); 766 if (!backupRoot.equals(info.getBackupRootDir())) { 767 iterator.remove(); 768 } 769 } 770 return history; 771 } 772 773 /** 774 * Get history for a table 775 * @param name table name 776 * @return history for a table 777 * @throws IOException if getting the backup history fails 778 */ 779 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { 780 List<BackupInfo> history = getBackupHistory(); 781 List<BackupInfo> tableHistory = new ArrayList<>(); 782 for (BackupInfo info : history) { 783 List<TableName> tables = info.getTableNames(); 784 if (tables.contains(name)) { 785 tableHistory.add(info); 786 } 787 } 788 return tableHistory; 789 } 790 791 /** 792 * Goes through all backup history corresponding to the provided root folder, and collects all 793 * backup info mentioning each of the provided tables. 794 * @param set the tables for which to collect the {@code BackupInfo} 795 * @param backupRoot backup destination path to retrieve backup history for 796 * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at 797 * least one {@code BackupInfo} 798 * @throws IOException if getting the backup history fails 799 */ 800 public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, 801 String backupRoot) throws IOException { 802 List<BackupInfo> history = getBackupHistory(backupRoot); 803 Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>(); 804 for (BackupInfo info : history) { 805 List<TableName> tables = info.getTableNames(); 806 for (TableName tableName : tables) { 807 if (set.contains(tableName)) { 808 List<BackupInfo> list = 809 tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>()); 810 list.add(info); 811 } 812 } 813 } 814 return tableHistoryMap; 815 } 816 817 /** 818 * Get all backup sessions with a given state (in descending order by time) 819 * @param state backup session state 820 * @return history info of backup info objects 821 * @throws IOException exception 822 */ 823 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { 824 LOG.trace("get backup infos from backup system table"); 825 826 Scan scan = createScanForBackupHistory(); 827 ArrayList<BackupInfo> list = new ArrayList<>(); 828 829 try (Table table = connection.getTable(tableName); 830 ResultScanner scanner = table.getScanner(scan)) { 831 Result res; 832 while ((res = scanner.next()) != null) { 833 res.advance(); 834 BackupInfo context = cellToBackupInfo(res.current()); 835 if (state != BackupState.ANY && context.getState() != state) { 836 continue; 837 } 838 list.add(context); 839 } 840 return list; 841 } 842 } 843 844 /** 845 * Write the current timestamps for each regionserver to backup system table after a successful 846 * full or incremental backup. The saved timestamp is of the last log file that was backed up 847 * already. 848 * @param tables tables 849 * @param newTimestamps timestamps 850 * @param backupRoot root directory path to backup 851 * @throws IOException exception 852 */ 853 public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps, 854 String backupRoot) throws IOException { 855 if (LOG.isTraceEnabled()) { 856 LOG.trace("write RS log time stamps to backup system table for tables [" 857 + StringUtils.join(tables, ",") + "]"); 858 } 859 List<Put> puts = new ArrayList<>(); 860 for (TableName table : tables) { 861 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); 862 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); 863 puts.add(put); 864 } 865 try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) { 866 bufferedMutator.mutate(puts); 867 } 868 } 869 870 /** 871 * Read the timestamp for each region server log after the last successful backup. Each table has 872 * its own set of the timestamps. The info is stored for each table as a concatenated string of 873 * rs->timestapmp 874 * @param backupRoot root directory path to backup 875 * @return the timestamp for each region server. key: tableName value: 876 * RegionServer,PreviousTimeStamp 877 * @throws IOException exception 878 */ 879 public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot) 880 throws IOException { 881 if (LOG.isTraceEnabled()) { 882 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); 883 } 884 885 Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>(); 886 887 Scan scan = createScanForReadLogTimestampMap(backupRoot); 888 try (Table table = connection.getTable(tableName); 889 ResultScanner scanner = table.getScanner(scan)) { 890 Result res; 891 while ((res = scanner.next()) != null) { 892 res.advance(); 893 Cell cell = res.current(); 894 byte[] row = CellUtil.cloneRow(cell); 895 String tabName = getTableNameForReadLogTimestampMap(row); 896 TableName tn = TableName.valueOf(tabName); 897 byte[] data = CellUtil.cloneValue(cell); 898 if (data == null) { 899 throw new IOException("Data of last backup data from backup system table " 900 + "is empty. Create a backup first."); 901 } 902 if (data != null && data.length > 0) { 903 HashMap<String, Long> lastBackup = 904 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); 905 tableTimestampMap.put(tn, lastBackup); 906 } 907 } 908 return tableTimestampMap; 909 } 910 } 911 912 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, 913 Map<String, Long> map) { 914 BackupProtos.TableServerTimestamp.Builder tstBuilder = 915 BackupProtos.TableServerTimestamp.newBuilder(); 916 tstBuilder 917 .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); 918 919 for (Entry<String, Long> entry : map.entrySet()) { 920 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); 921 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); 922 ServerName sn = ServerName.parseServerName(entry.getKey()); 923 snBuilder.setHostName(sn.getHostname()); 924 snBuilder.setPort(sn.getPort()); 925 builder.setServerName(snBuilder.build()); 926 builder.setTimestamp(entry.getValue()); 927 tstBuilder.addServerTimestamp(builder.build()); 928 } 929 930 return tstBuilder.build(); 931 } 932 933 private HashMap<String, Long> 934 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { 935 936 HashMap<String, Long> map = new HashMap<>(); 937 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); 938 for (BackupProtos.ServerTimestamp st : list) { 939 ServerName sn = 940 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); 941 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); 942 } 943 return map; 944 } 945 946 /** 947 * Return the current tables covered by incremental backup. 948 * @param backupRoot root directory path to backup 949 * @return set of tableNames 950 * @throws IOException exception 951 */ 952 public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { 953 LOG.trace("get incremental backup table set from backup system table"); 954 955 TreeSet<TableName> set = new TreeSet<>(); 956 957 try (Table table = connection.getTable(tableName)) { 958 Get get = createGetForIncrBackupTableSet(backupRoot); 959 Result res = table.get(get); 960 if (res.isEmpty()) { 961 return set; 962 } 963 List<Cell> cells = res.listCells(); 964 for (Cell cell : cells) { 965 // qualifier = table name - we use table names as qualifiers 966 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); 967 } 968 return set; 969 } 970 } 971 972 /** 973 * Add tables to global incremental backup set 974 * @param tables set of tables 975 * @param backupRoot root directory path to backup 976 * @throws IOException exception 977 */ 978 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) 979 throws IOException { 980 if (LOG.isTraceEnabled()) { 981 LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot 982 + " tables [" + StringUtils.join(tables, " ") + "]"); 983 } 984 if (LOG.isDebugEnabled()) { 985 tables.forEach(table -> LOG.debug(Objects.toString(table))); 986 } 987 try (Table table = connection.getTable(tableName)) { 988 Put put = createPutForIncrBackupTableSet(tables, backupRoot); 989 table.put(put); 990 } 991 } 992 993 /** 994 * Deletes incremental backup set for a backup destination 995 * @param backupRoot backup root 996 */ 997 public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { 998 if (LOG.isTraceEnabled()) { 999 LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); 1000 } 1001 try (Table table = connection.getTable(tableName)) { 1002 Delete delete = createDeleteForIncrBackupTableSet(backupRoot); 1003 table.delete(delete); 1004 } 1005 } 1006 1007 /** 1008 * Checks if we have at least one backup session in backup system table This API is used by 1009 * BackupLogCleaner 1010 * @return true, if - at least one session exists in backup system table table 1011 * @throws IOException exception 1012 */ 1013 public boolean hasBackupSessions() throws IOException { 1014 LOG.trace("Has backup sessions from backup system table"); 1015 1016 boolean result = false; 1017 Scan scan = createScanForBackupHistory(); 1018 scan.setCaching(1); 1019 try (Table table = connection.getTable(tableName); 1020 ResultScanner scanner = table.getScanner(scan)) { 1021 if (scanner.next() != null) { 1022 result = true; 1023 } 1024 return result; 1025 } 1026 } 1027 1028 /** 1029 * BACKUP SETS 1030 */ 1031 1032 /** 1033 * Get backup set list 1034 * @return backup set list 1035 * @throws IOException if a table or scanner operation fails 1036 */ 1037 public List<String> listBackupSets() throws IOException { 1038 LOG.trace("Backup set list"); 1039 1040 List<String> list = new ArrayList<>(); 1041 try (Table table = connection.getTable(tableName)) { 1042 Scan scan = createScanForBackupSetList(); 1043 scan.readVersions(1); 1044 try (ResultScanner scanner = table.getScanner(scan)) { 1045 Result res; 1046 while ((res = scanner.next()) != null) { 1047 res.advance(); 1048 list.add(cellKeyToBackupSetName(res.current())); 1049 } 1050 return list; 1051 } 1052 } 1053 } 1054 1055 /** 1056 * Get backup set description (list of tables) 1057 * @param name set's name 1058 * @return list of tables in a backup set 1059 * @throws IOException if a table operation fails 1060 */ 1061 public List<TableName> describeBackupSet(String name) throws IOException { 1062 if (LOG.isTraceEnabled()) { 1063 LOG.trace(" Backup set describe: " + name); 1064 } 1065 try (Table table = connection.getTable(tableName)) { 1066 Get get = createGetForBackupSet(name); 1067 Result res = table.get(get); 1068 if (res.isEmpty()) { 1069 return null; 1070 } 1071 res.advance(); 1072 String[] tables = cellValueToBackupSet(res.current()); 1073 return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) 1074 .collect(Collectors.toList()); 1075 } 1076 } 1077 1078 /** 1079 * Add backup set (list of tables) 1080 * @param name set name 1081 * @param newTables list of tables, comma-separated 1082 * @throws IOException if a table operation fails 1083 */ 1084 public void addToBackupSet(String name, String[] newTables) throws IOException { 1085 if (LOG.isTraceEnabled()) { 1086 LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); 1087 } 1088 String[] union = null; 1089 try (Table table = connection.getTable(tableName)) { 1090 Get get = createGetForBackupSet(name); 1091 Result res = table.get(get); 1092 if (res.isEmpty()) { 1093 union = newTables; 1094 } else { 1095 res.advance(); 1096 String[] tables = cellValueToBackupSet(res.current()); 1097 union = merge(tables, newTables); 1098 } 1099 Put put = createPutForBackupSet(name, union); 1100 table.put(put); 1101 } 1102 } 1103 1104 /** 1105 * Remove tables from backup set (list of tables) 1106 * @param name set name 1107 * @param toRemove list of tables 1108 * @throws IOException if a table operation or deleting the backup set fails 1109 */ 1110 public void removeFromBackupSet(String name, String[] toRemove) throws IOException { 1111 if (LOG.isTraceEnabled()) { 1112 LOG.trace( 1113 " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); 1114 } 1115 String[] disjoint; 1116 String[] tables; 1117 try (Table table = connection.getTable(tableName)) { 1118 Get get = createGetForBackupSet(name); 1119 Result res = table.get(get); 1120 if (res.isEmpty()) { 1121 LOG.warn("Backup set '" + name + "' not found."); 1122 return; 1123 } else { 1124 res.advance(); 1125 tables = cellValueToBackupSet(res.current()); 1126 disjoint = disjoin(tables, toRemove); 1127 } 1128 if (disjoint.length > 0 && disjoint.length != tables.length) { 1129 Put put = createPutForBackupSet(name, disjoint); 1130 table.put(put); 1131 } else if (disjoint.length == tables.length) { 1132 LOG.warn("Backup set '" + name + "' does not contain tables [" 1133 + StringUtils.join(toRemove, " ") + "]"); 1134 } else { // disjoint.length == 0 and tables.length >0 1135 // Delete backup set 1136 LOG.info("Backup set '" + name + "' is empty. Deleting."); 1137 deleteBackupSet(name); 1138 } 1139 } 1140 } 1141 1142 private String[] merge(String[] existingTables, String[] newTables) { 1143 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1144 tables.addAll(Arrays.asList(newTables)); 1145 return tables.toArray(new String[0]); 1146 } 1147 1148 private String[] disjoin(String[] existingTables, String[] toRemove) { 1149 Set<String> tables = new HashSet<>(Arrays.asList(existingTables)); 1150 Arrays.asList(toRemove).forEach(table -> tables.remove(table)); 1151 return tables.toArray(new String[0]); 1152 } 1153 1154 /** 1155 * Delete backup set 1156 * @param name set's name 1157 * @throws IOException if getting or deleting the table fails 1158 */ 1159 public void deleteBackupSet(String name) throws IOException { 1160 if (LOG.isTraceEnabled()) { 1161 LOG.trace(" Backup set delete: " + name); 1162 } 1163 try (Table table = connection.getTable(tableName)) { 1164 Delete del = createDeleteForBackupSet(name); 1165 table.delete(del); 1166 } 1167 } 1168 1169 /** 1170 * Get backup system table descriptor 1171 * @return table's descriptor 1172 */ 1173 public static TableDescriptor getSystemTableDescriptor(Configuration conf) { 1174 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); 1175 1176 ColumnFamilyDescriptorBuilder colBuilder = 1177 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1178 1179 colBuilder.setMaxVersions(1); 1180 Configuration config = HBaseConfiguration.create(); 1181 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1182 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1183 colBuilder.setTimeToLive(ttl); 1184 1185 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1186 builder.setColumnFamily(colSessionsDesc); 1187 1188 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1189 colBuilder.setTimeToLive(ttl); 1190 builder.setColumnFamily(colBuilder.build()); 1191 return builder.build(); 1192 } 1193 1194 public static TableName getTableName(Configuration conf) { 1195 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1196 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); 1197 return TableName.valueOf(name); 1198 } 1199 1200 public static String getTableNameAsString(Configuration conf) { 1201 return getTableName(conf).getNameAsString(); 1202 } 1203 1204 public static String getSnapshotName(Configuration conf) { 1205 return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); 1206 } 1207 1208 /** 1209 * Get backup system table descriptor 1210 * @return table's descriptor 1211 */ 1212 public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { 1213 TableDescriptorBuilder builder = 1214 TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); 1215 1216 ColumnFamilyDescriptorBuilder colBuilder = 1217 ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); 1218 colBuilder.setMaxVersions(1); 1219 Configuration config = HBaseConfiguration.create(); 1220 int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, 1221 BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); 1222 colBuilder.setTimeToLive(ttl); 1223 ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); 1224 builder.setColumnFamily(colSessionsDesc); 1225 colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); 1226 colBuilder.setTimeToLive(ttl); 1227 builder.setColumnFamily(colBuilder.build()); 1228 return builder.build(); 1229 } 1230 1231 public static TableName getTableNameForBulkLoadedData(Configuration conf) { 1232 String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, 1233 BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; 1234 return TableName.valueOf(name); 1235 } 1236 1237 /** 1238 * Creates Put operation for a given backup info object 1239 * @param context backup info 1240 * @return put operation 1241 * @throws IOException exception 1242 */ 1243 private Put createPutForBackupInfo(BackupInfo context) throws IOException { 1244 Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); 1245 put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), 1246 context.toByteArray()); 1247 return put; 1248 } 1249 1250 /** 1251 * Creates Get operation for a given backup id 1252 * @param backupId backup's ID 1253 * @return get operation 1254 * @throws IOException exception 1255 */ 1256 private Get createGetForBackupInfo(String backupId) throws IOException { 1257 Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); 1258 get.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1259 get.readVersions(1); 1260 return get; 1261 } 1262 1263 /** 1264 * Creates Delete operation for a given backup id 1265 * @param backupId backup's ID 1266 * @return delete operation 1267 */ 1268 private Delete createDeleteForBackupInfo(String backupId) { 1269 Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); 1270 del.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1271 return del; 1272 } 1273 1274 /** 1275 * Converts Result to BackupInfo 1276 * @param res HBase result 1277 * @return backup info instance 1278 * @throws IOException exception 1279 */ 1280 private BackupInfo resultToBackupInfo(Result res) throws IOException { 1281 res.advance(); 1282 Cell cell = res.current(); 1283 return cellToBackupInfo(cell); 1284 } 1285 1286 /** 1287 * Creates Get operation to retrieve start code from backup system table 1288 * @return get operation 1289 * @throws IOException exception 1290 */ 1291 private Get createGetForStartCode(String rootPath) throws IOException { 1292 Get get = new Get(rowkey(START_CODE_ROW, rootPath)); 1293 get.addFamily(BackupSystemTable.META_FAMILY); 1294 get.readVersions(1); 1295 return get; 1296 } 1297 1298 /** 1299 * Creates Put operation to store start code to backup system table 1300 * @return put operation 1301 */ 1302 private Put createPutForStartCode(String startCode, String rootPath) { 1303 Put put = new Put(rowkey(START_CODE_ROW, rootPath)); 1304 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), 1305 Bytes.toBytes(startCode)); 1306 return put; 1307 } 1308 1309 /** 1310 * Creates Get to retrieve incremental backup table set from backup system table 1311 * @return get operation 1312 * @throws IOException exception 1313 */ 1314 private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { 1315 Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); 1316 get.addFamily(BackupSystemTable.META_FAMILY); 1317 get.readVersions(1); 1318 return get; 1319 } 1320 1321 /** 1322 * Creates Put to store incremental backup table set 1323 * @param tables tables 1324 * @return put operation 1325 */ 1326 private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { 1327 Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); 1328 for (TableName table : tables) { 1329 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), 1330 EMPTY_VALUE); 1331 } 1332 return put; 1333 } 1334 1335 /** 1336 * Creates Delete for incremental backup table set 1337 * @param backupRoot backup root 1338 * @return delete operation 1339 */ 1340 private Delete createDeleteForIncrBackupTableSet(String backupRoot) { 1341 Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); 1342 delete.addFamily(BackupSystemTable.META_FAMILY); 1343 return delete; 1344 } 1345 1346 /** 1347 * Creates Scan operation to load backup history 1348 * @return scan operation 1349 */ 1350 private Scan createScanForBackupHistory() { 1351 Scan scan = new Scan(); 1352 byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); 1353 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1354 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1355 scan.withStartRow(startRow); 1356 scan.withStopRow(stopRow); 1357 scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); 1358 scan.readVersions(1); 1359 return scan; 1360 } 1361 1362 /** 1363 * Converts cell to backup info instance. 1364 * @param current current cell 1365 * @return backup backup info instance 1366 * @throws IOException exception 1367 */ 1368 private BackupInfo cellToBackupInfo(Cell current) throws IOException { 1369 byte[] data = CellUtil.cloneValue(current); 1370 return BackupInfo.fromByteArray(data); 1371 } 1372 1373 /** 1374 * Creates Put to write RS last roll log timestamp map 1375 * @param table table 1376 * @param smap map, containing RS:ts 1377 * @return put operation 1378 */ 1379 private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 1380 String backupRoot) { 1381 Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); 1382 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); 1383 return put; 1384 } 1385 1386 /** 1387 * Creates Scan to load table-> { RS -> ts} map of maps 1388 * @return scan operation 1389 */ 1390 private Scan createScanForReadLogTimestampMap(String backupRoot) { 1391 Scan scan = new Scan(); 1392 scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL)); 1393 scan.addFamily(BackupSystemTable.META_FAMILY); 1394 1395 return scan; 1396 } 1397 1398 /** 1399 * Get table name from rowkey 1400 * @param cloneRow rowkey 1401 * @return table name 1402 */ 1403 private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { 1404 String s = Bytes.toString(cloneRow); 1405 int index = s.lastIndexOf(NULL); 1406 return s.substring(index + 1); 1407 } 1408 1409 /** 1410 * Creates Put to store RS last log result 1411 * @param server server name 1412 * @param timestamp log roll result (timestamp) 1413 * @return put operation 1414 */ 1415 private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, 1416 String backupRoot) { 1417 Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); 1418 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), 1419 Bytes.toBytes(timestamp)); 1420 return put; 1421 } 1422 1423 /** 1424 * Creates Scan operation to load last RS log roll results 1425 * @return scan operation 1426 */ 1427 private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { 1428 Scan scan = new Scan(); 1429 scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL)); 1430 scan.addFamily(BackupSystemTable.META_FAMILY); 1431 scan.readVersions(1); 1432 1433 return scan; 1434 } 1435 1436 /** 1437 * Get server's name from rowkey 1438 * @param row rowkey 1439 * @return server's name 1440 */ 1441 private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { 1442 String s = Bytes.toString(row); 1443 int index = s.lastIndexOf(NULL); 1444 return s.substring(index + 1); 1445 } 1446 1447 /** 1448 * Creates Put's for bulk loads. 1449 */ 1450 private static List<Put> createPutForBulkLoad(TableName table, byte[] region, 1451 Map<byte[], List<Path>> columnFamilyToHFilePaths) { 1452 List<Put> puts = new ArrayList<>(); 1453 for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) { 1454 for (Path path : entry.getValue()) { 1455 String file = path.toString(); 1456 int lastSlash = file.lastIndexOf("/"); 1457 String filename = file.substring(lastSlash + 1); 1458 Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 1459 Bytes.toString(region), BLK_LD_DELIM, filename)); 1460 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); 1461 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); 1462 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file)); 1463 puts.add(put); 1464 LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region)); 1465 } 1466 } 1467 return puts; 1468 } 1469 1470 public static void snapshot(Connection conn) throws IOException { 1471 try (Admin admin = conn.getAdmin()) { 1472 Configuration conf = conn.getConfiguration(); 1473 admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); 1474 } 1475 } 1476 1477 public static void restoreFromSnapshot(Connection conn) throws IOException { 1478 Configuration conf = conn.getConfiguration(); 1479 LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); 1480 try (Admin admin = conn.getAdmin()) { 1481 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1482 if (snapshotExists(admin, snapshotName)) { 1483 admin.disableTable(BackupSystemTable.getTableName(conf)); 1484 admin.restoreSnapshot(snapshotName); 1485 admin.enableTable(BackupSystemTable.getTableName(conf)); 1486 LOG.debug("Done restoring backup system table"); 1487 } else { 1488 // Snapshot does not exists, i.e completeBackup failed after 1489 // deleting backup system table snapshot 1490 // In this case we log WARN and proceed 1491 LOG.warn( 1492 "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); 1493 } 1494 } 1495 } 1496 1497 private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { 1498 List<SnapshotDescription> list = admin.listSnapshots(); 1499 for (SnapshotDescription desc : list) { 1500 if (desc.getName().equals(snapshotName)) { 1501 return true; 1502 } 1503 } 1504 return false; 1505 } 1506 1507 public static boolean snapshotExists(Connection conn) throws IOException { 1508 return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); 1509 } 1510 1511 public static void deleteSnapshot(Connection conn) throws IOException { 1512 Configuration conf = conn.getConfiguration(); 1513 LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); 1514 try (Admin admin = conn.getAdmin()) { 1515 String snapshotName = BackupSystemTable.getSnapshotName(conf); 1516 if (snapshotExists(admin, snapshotName)) { 1517 admin.deleteSnapshot(snapshotName); 1518 LOG.debug("Done deleting backup system table snapshot"); 1519 } else { 1520 LOG.error("Snapshot " + snapshotName + " does not exists"); 1521 } 1522 } 1523 } 1524 1525 public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) { 1526 List<Delete> lstDels = new ArrayList<>(lst.size()); 1527 for (TableName table : lst) { 1528 Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); 1529 del.addFamily(BackupSystemTable.META_FAMILY); 1530 lstDels.add(del); 1531 } 1532 return lstDels; 1533 } 1534 1535 private Put createPutForDeleteOperation(String[] backupIdList) { 1536 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1537 Put put = new Put(DELETE_OP_ROW); 1538 put.addColumn(META_FAMILY, FAM_COL, value); 1539 return put; 1540 } 1541 1542 private Delete createDeleteForBackupDeleteOperation() { 1543 Delete delete = new Delete(DELETE_OP_ROW); 1544 delete.addFamily(META_FAMILY); 1545 return delete; 1546 } 1547 1548 private Get createGetForDeleteOperation() { 1549 Get get = new Get(DELETE_OP_ROW); 1550 get.addFamily(META_FAMILY); 1551 return get; 1552 } 1553 1554 public void startDeleteOperation(String[] backupIdList) throws IOException { 1555 if (LOG.isTraceEnabled()) { 1556 LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); 1557 } 1558 Put put = createPutForDeleteOperation(backupIdList); 1559 try (Table table = connection.getTable(tableName)) { 1560 table.put(put); 1561 } 1562 } 1563 1564 public void finishDeleteOperation() throws IOException { 1565 LOG.trace("Finsih delete operation for backup ids"); 1566 1567 Delete delete = createDeleteForBackupDeleteOperation(); 1568 try (Table table = connection.getTable(tableName)) { 1569 table.delete(delete); 1570 } 1571 } 1572 1573 public String[] getListOfBackupIdsFromDeleteOperation() throws IOException { 1574 LOG.trace("Get delete operation for backup ids"); 1575 1576 Get get = createGetForDeleteOperation(); 1577 try (Table table = connection.getTable(tableName)) { 1578 Result res = table.get(get); 1579 if (res.isEmpty()) { 1580 return null; 1581 } 1582 Cell cell = res.listCells().get(0); 1583 byte[] val = CellUtil.cloneValue(cell); 1584 if (val.length == 0) { 1585 return null; 1586 } 1587 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1588 .toArray(String[]::new); 1589 } 1590 } 1591 1592 private Put createPutForMergeOperation(String[] backupIdList) { 1593 byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); 1594 Put put = new Put(MERGE_OP_ROW); 1595 put.addColumn(META_FAMILY, FAM_COL, value); 1596 return put; 1597 } 1598 1599 public boolean isMergeInProgress() throws IOException { 1600 Get get = new Get(MERGE_OP_ROW); 1601 try (Table table = connection.getTable(tableName)) { 1602 Result res = table.get(get); 1603 return !res.isEmpty(); 1604 } 1605 } 1606 1607 private Put createPutForUpdateTablesForMerge(List<TableName> tables) { 1608 byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); 1609 Put put = new Put(MERGE_OP_ROW); 1610 put.addColumn(META_FAMILY, PATH_COL, value); 1611 return put; 1612 } 1613 1614 private Delete createDeleteForBackupMergeOperation() { 1615 Delete delete = new Delete(MERGE_OP_ROW); 1616 delete.addFamily(META_FAMILY); 1617 return delete; 1618 } 1619 1620 private Get createGetForMergeOperation() { 1621 Get get = new Get(MERGE_OP_ROW); 1622 get.addFamily(META_FAMILY); 1623 return get; 1624 } 1625 1626 public void startMergeOperation(String[] backupIdList) throws IOException { 1627 if (LOG.isTraceEnabled()) { 1628 LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); 1629 } 1630 Put put = createPutForMergeOperation(backupIdList); 1631 try (Table table = connection.getTable(tableName)) { 1632 table.put(put); 1633 } 1634 } 1635 1636 public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException { 1637 if (LOG.isTraceEnabled()) { 1638 LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); 1639 } 1640 Put put = createPutForUpdateTablesForMerge(tables); 1641 try (Table table = connection.getTable(tableName)) { 1642 table.put(put); 1643 } 1644 } 1645 1646 public void finishMergeOperation() throws IOException { 1647 LOG.trace("Finish merge operation for backup ids"); 1648 1649 Delete delete = createDeleteForBackupMergeOperation(); 1650 try (Table table = connection.getTable(tableName)) { 1651 table.delete(delete); 1652 } 1653 } 1654 1655 public String[] getListOfBackupIdsFromMergeOperation() throws IOException { 1656 LOG.trace("Get backup ids for merge operation"); 1657 1658 Get get = createGetForMergeOperation(); 1659 try (Table table = connection.getTable(tableName)) { 1660 Result res = table.get(get); 1661 if (res.isEmpty()) { 1662 return null; 1663 } 1664 Cell cell = res.listCells().get(0); 1665 byte[] val = CellUtil.cloneValue(cell); 1666 if (val.length == 0) { 1667 return null; 1668 } 1669 return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8)) 1670 .toArray(String[]::new); 1671 } 1672 } 1673 1674 static Scan createScanForOrigBulkLoadedFiles(TableName table) { 1675 Scan scan = new Scan(); 1676 byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); 1677 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1678 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1679 scan.withStartRow(startRow); 1680 scan.withStopRow(stopRow); 1681 scan.addFamily(BackupSystemTable.META_FAMILY); 1682 scan.readVersions(1); 1683 return scan; 1684 } 1685 1686 static String getTableNameFromOrigBulkLoadRow(String rowStr) { 1687 // format is bulk : namespace : table : region : file 1688 return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1); 1689 } 1690 1691 static String getRegionNameFromOrigBulkLoadRow(String rowStr) { 1692 // format is bulk : namespace : table : region : file 1693 List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr); 1694 Iterator<String> i = parts.iterator(); 1695 int idx = 3; 1696 if (parts.size() == 4) { 1697 // the table is in default namespace 1698 idx = 2; 1699 } 1700 String region = Iterators.get(i, idx); 1701 LOG.debug("bulk row string " + rowStr + " region " + region); 1702 return region; 1703 } 1704 1705 /* 1706 * Used to query bulk loaded hfiles which have been copied by incremental backup 1707 * @param backupId the backup Id. It can be null when querying for all tables 1708 * @return the Scan object 1709 */ 1710 static Scan createScanForBulkLoadedFiles(String backupId) { 1711 Scan scan = new Scan(); 1712 byte[] startRow = 1713 backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); 1714 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1715 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1716 scan.withStartRow(startRow); 1717 scan.withStopRow(stopRow); 1718 scan.addFamily(BackupSystemTable.META_FAMILY); 1719 scan.readVersions(1); 1720 return scan; 1721 } 1722 1723 static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, 1724 long ts, int idx) { 1725 Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); 1726 put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); 1727 put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); 1728 put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p)); 1729 return put; 1730 } 1731 1732 /** 1733 * Creates Scan operation to load backup set list 1734 * @return scan operation 1735 */ 1736 private Scan createScanForBackupSetList() { 1737 Scan scan = new Scan(); 1738 byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); 1739 byte[] stopRow = Arrays.copyOf(startRow, startRow.length); 1740 stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); 1741 scan.withStartRow(startRow); 1742 scan.withStopRow(stopRow); 1743 scan.addFamily(BackupSystemTable.META_FAMILY); 1744 return scan; 1745 } 1746 1747 /** 1748 * Creates Get operation to load backup set content 1749 * @return get operation 1750 */ 1751 private Get createGetForBackupSet(String name) { 1752 Get get = new Get(rowkey(SET_KEY_PREFIX, name)); 1753 get.addFamily(BackupSystemTable.META_FAMILY); 1754 return get; 1755 } 1756 1757 /** 1758 * Creates Delete operation to delete backup set content 1759 * @param name backup set's name 1760 * @return delete operation 1761 */ 1762 private Delete createDeleteForBackupSet(String name) { 1763 Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); 1764 del.addFamily(BackupSystemTable.META_FAMILY); 1765 return del; 1766 } 1767 1768 /** 1769 * Creates Put operation to update backup set content 1770 * @param name backup set's name 1771 * @param tables list of tables 1772 * @return put operation 1773 */ 1774 private Put createPutForBackupSet(String name, String[] tables) { 1775 Put put = new Put(rowkey(SET_KEY_PREFIX, name)); 1776 byte[] value = convertToByteArray(tables); 1777 put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); 1778 return put; 1779 } 1780 1781 private byte[] convertToByteArray(String[] tables) { 1782 return Bytes.toBytes(StringUtils.join(tables, ",")); 1783 } 1784 1785 /** 1786 * Converts cell to backup set list. 1787 * @param current current cell 1788 * @return backup set as array of table names 1789 */ 1790 private String[] cellValueToBackupSet(Cell current) { 1791 byte[] data = CellUtil.cloneValue(current); 1792 if (!ArrayUtils.isEmpty(data)) { 1793 return Bytes.toString(data).split(","); 1794 } 1795 return new String[0]; 1796 } 1797 1798 /** 1799 * Converts cell key to backup set name. 1800 * @param current current cell 1801 * @return backup set name 1802 */ 1803 private String cellKeyToBackupSetName(Cell current) { 1804 byte[] data = CellUtil.cloneRow(current); 1805 return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); 1806 } 1807 1808 private static byte[] rowkey(String s, String... other) { 1809 StringBuilder sb = new StringBuilder(s); 1810 for (String ss : other) { 1811 sb.append(ss); 1812 } 1813 return Bytes.toBytes(sb.toString()); 1814 } 1815 1816 private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException { 1817 if (!admin.isTableEnabled(tableName)) { 1818 try { 1819 admin.enableTable(tableName); 1820 } catch (TableNotDisabledException ignored) { 1821 LOG.info("Table {} is not disabled, ignoring enable request", tableName); 1822 } 1823 } 1824 } 1825}