001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.time.Instant; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.List; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 053import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.junit.After; 056import org.junit.AfterClass; 057import org.junit.Before; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Category(MediumTests.class) 068@RunWith(Parameterized.class) 069public class TestBackupRestoreOnEmptyEnvironment { 070 071 private static final Logger LOG = 072 LoggerFactory.getLogger(TestBackupRestoreOnEmptyEnvironment.class); 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestBackupRestoreOnEmptyEnvironment.class); 077 078 @Parameterized.Parameters(name = "{index}: restoreToOtherTable={0}") 079 public static Iterable<Object[]> data() { 080 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED; 081 } 082 083 @Parameterized.Parameter(0) 084 public boolean restoreToOtherTable; 085 private TableName sourceTable; 086 private TableName targetTable; 087 088 private static TestingHBaseCluster cluster; 089 private static Path BACKUP_ROOT_DIR; 090 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 091 092 @BeforeClass 093 public static void beforeClass() throws Exception { 094 Configuration conf = HBaseConfiguration.create(); 095 enableBackup(conf); 096 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 097 cluster.start(); 098 BACKUP_ROOT_DIR = new Path(new Path(conf.get("fs.defaultFS")), new Path("/backupIT")); 099 } 100 101 @AfterClass 102 public static void afterClass() throws Exception { 103 cluster.stop(); 104 } 105 106 @Before 107 public void setUp() throws Exception { 108 sourceTable = TableName.valueOf("table"); 109 targetTable = TableName.valueOf("another-table"); 110 createTable(sourceTable); 111 createTable(targetTable); 112 } 113 114 @After 115 public void removeTables() throws Exception { 116 deleteTables(); 117 } 118 119 @Test 120 public void testRestoreToCorrectTable() throws Exception { 121 Instant timestamp = Instant.now().minusSeconds(10); 122 123 // load some data 124 putLoad(sourceTable, timestamp, "data"); 125 126 String backupId = backup(FULL, Collections.singletonList(sourceTable)); 127 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 128 assertTrue(backupInfo.getTables().contains(sourceTable)); 129 130 LOG.info("Deleting the tables before restore ..."); 131 deleteTables(); 132 133 if (restoreToOtherTable) { 134 restore(backupId, sourceTable, targetTable); 135 validateDataEquals(targetTable, "data"); 136 } else { 137 restore(backupId, sourceTable, sourceTable); 138 validateDataEquals(sourceTable, "data"); 139 } 140 141 } 142 143 @Test 144 public void testRestoreCorrectTableForIncremental() throws Exception { 145 Instant timestamp = Instant.now().minusSeconds(10); 146 147 // load some data 148 putLoad(sourceTable, timestamp, "data"); 149 150 String backupId = backup(FULL, Collections.singletonList(sourceTable)); 151 verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 152 153 // some incremental data 154 putLoad(sourceTable, timestamp.plusMillis(1), "new_data"); 155 156 String backupId2 = backup(INCREMENTAL, Collections.singletonList(sourceTable)); 157 verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 158 159 LOG.info("Deleting the tables before restore ..."); 160 deleteTables(); 161 162 if (restoreToOtherTable) { 163 restore(backupId2, sourceTable, targetTable); 164 validateDataEquals(targetTable, "new_data"); 165 } else { 166 restore(backupId2, sourceTable, sourceTable); 167 validateDataEquals(sourceTable, "new_data"); 168 } 169 170 } 171 172 private void createTable(TableName tableName) throws IOException { 173 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 174 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 175 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 176 Admin admin = connection.getAdmin()) { 177 admin.createTable(builder.build()); 178 } 179 } 180 181 private void deleteTables() throws IOException { 182 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 183 Admin admin = connection.getAdmin()) { 184 for (TableName table : Arrays.asList(sourceTable, targetTable)) { 185 if (admin.tableExists(table)) { 186 admin.disableTable(table); 187 admin.deleteTable(table); 188 } 189 } 190 } 191 } 192 193 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 194 LOG.info("Writing new data to HBase using normal Puts: {}", data); 195 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 196 Table table = connection.getTable(sourceTable); 197 List<Put> puts = new ArrayList<>(); 198 for (int i = 0; i < 10; i++) { 199 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 200 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 201 puts.add(put); 202 203 if (i % 100 == 0) { 204 table.put(puts); 205 puts.clear(); 206 } 207 } 208 if (!puts.isEmpty()) { 209 table.put(puts); 210 } 211 connection.getAdmin().flush(tableName); 212 } 213 } 214 215 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 216 LOG.info("Creating the backup ..."); 217 218 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 219 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 220 BackupRequest backupRequest = 221 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 222 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 223 return backupAdmin.backupTables(backupRequest); 224 } 225 226 } 227 228 private void restore(String backupId, TableName sourceTableName, TableName targetTableName) 229 throws IOException { 230 LOG.info("Restoring data ..."); 231 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 232 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 233 RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId) 234 .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true) 235 .withFromTables(new TableName[] { sourceTableName }) 236 .withToTables(new TableName[] { targetTableName }).build(); 237 backupAdmin.restore(restoreRequest); 238 } 239 } 240 241 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 242 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 243 Table table = connection.getTable(tableName)) { 244 Scan scan = new Scan(); 245 scan.setRaw(true); 246 scan.setBatch(100); 247 248 for (Result sourceResult : table.getScanner(scan)) { 249 List<Cell> sourceCells = sourceResult.listCells(); 250 for (Cell cell : sourceCells) { 251 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 252 cell.getValueOffset(), cell.getValueLength())); 253 } 254 } 255 } 256 } 257}