001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.nio.ByteBuffer; 030import java.time.Instant; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.UUID; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.ConnectionFactory; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 055import org.apache.hadoop.hbase.io.hfile.HFile; 056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 059import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 060import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.runner.RunWith; 069import org.junit.runners.Parameterized; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category(MediumTests.class) 074@RunWith(Parameterized.class) 075public class TestBackupRestoreWithModifications { 076 077 private static final Logger LOG = 078 LoggerFactory.getLogger(TestBackupRestoreWithModifications.class); 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBackupRestoreWithModifications.class); 083 084 @Parameterized.Parameters(name = "{index}: useBulkLoad={0}") 085 public static Iterable<Object[]> data() { 086 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED; 087 } 088 089 @Parameterized.Parameter(0) 090 public boolean useBulkLoad; 091 092 private TableName sourceTable; 093 private TableName targetTable; 094 095 private List<TableName> allTables; 096 private static TestingHBaseCluster cluster; 097 private static final Path BACKUP_ROOT_DIR = new Path("backupIT"); 098 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 099 100 @BeforeClass 101 public static void beforeClass() throws Exception { 102 Configuration conf = HBaseConfiguration.create(); 103 enableBackup(conf); 104 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 105 cluster.start(); 106 } 107 108 @AfterClass 109 public static void afterClass() throws Exception { 110 cluster.stop(); 111 } 112 113 @Before 114 public void setUp() throws Exception { 115 sourceTable = TableName.valueOf("table-" + useBulkLoad); 116 targetTable = TableName.valueOf("another-table-" + useBulkLoad); 117 allTables = Arrays.asList(sourceTable, targetTable); 118 createTable(sourceTable); 119 createTable(targetTable); 120 } 121 122 @Test 123 public void testModificationsOnTable() throws Exception { 124 Instant timestamp = Instant.now(); 125 126 // load some data 127 load(sourceTable, timestamp, "data"); 128 129 String backupId = backup(FULL, allTables); 130 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 131 assertTrue(backupInfo.getTables().contains(sourceTable)); 132 133 restore(backupId, sourceTable, targetTable); 134 validateDataEquals(sourceTable, "data"); 135 validateDataEquals(targetTable, "data"); 136 137 // load new data on the same timestamp 138 load(sourceTable, timestamp, "changed_data"); 139 140 backupId = backup(FULL, allTables); 141 backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 142 assertTrue(backupInfo.getTables().contains(sourceTable)); 143 144 restore(backupId, sourceTable, targetTable); 145 validateDataEquals(sourceTable, "changed_data"); 146 validateDataEquals(targetTable, "changed_data"); 147 } 148 149 private void createTable(TableName tableName) throws IOException { 150 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 151 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 152 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 153 Admin admin = connection.getAdmin()) { 154 admin.createTable(builder.build()); 155 } 156 } 157 158 private void load(TableName tableName, Instant timestamp, String data) throws IOException { 159 if (useBulkLoad) { 160 hFileBulkLoad(tableName, timestamp, data); 161 } else { 162 putLoad(tableName, timestamp, data); 163 } 164 } 165 166 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 167 LOG.info("Writing new data to HBase using normal Puts: {}", data); 168 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 169 Table table = connection.getTable(sourceTable); 170 List<Put> puts = new ArrayList<>(); 171 for (int i = 0; i < 10; i++) { 172 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 173 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 174 puts.add(put); 175 176 if (i % 100 == 0) { 177 table.put(puts); 178 puts.clear(); 179 } 180 } 181 if (!puts.isEmpty()) { 182 table.put(puts); 183 } 184 connection.getAdmin().flush(tableName); 185 } 186 } 187 188 private void hFileBulkLoad(TableName tableName, Instant timestamp, String data) 189 throws IOException { 190 FileSystem fs = FileSystem.get(cluster.getConf()); 191 LOG.info("Writing new data to HBase using BulkLoad: {}", data); 192 // HFiles require this strict directory structure to allow to load them 193 Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID()); 194 fs.mkdirs(hFileRootPath); 195 Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY)); 196 fs.mkdirs(hFileFamilyPath); 197 try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf()) 198 .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID())) 199 .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes()) 200 .withColumnFamily(COLUMN_FAMILY).build()) 201 .create()) { 202 for (int i = 0; i < 10; i++) { 203 writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"), 204 timestamp.toEpochMilli(), Bytes.toBytes(data))); 205 } 206 } 207 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 208 BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath); 209 assertFalse(result.isEmpty()); 210 } 211 212 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 213 LOG.info("Creating the backup ..."); 214 215 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 216 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 217 BackupRequest backupRequest = 218 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 219 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 220 return backupAdmin.backupTables(backupRequest); 221 } 222 223 } 224 225 private void restore(String backupId, TableName sourceTableName, TableName targetTableName) 226 throws IOException { 227 LOG.info("Restoring data ..."); 228 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 229 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 230 RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId) 231 .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true) 232 .withFromTables(new TableName[] { sourceTableName }) 233 .withToTables(new TableName[] { targetTableName }).build(); 234 backupAdmin.restore(restoreRequest); 235 } 236 } 237 238 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 239 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 240 Table table = connection.getTable(tableName)) { 241 Scan scan = new Scan(); 242 scan.readAllVersions(); 243 scan.setRaw(true); 244 scan.setBatch(100); 245 246 for (Result sourceResult : table.getScanner(scan)) { 247 List<Cell> sourceCells = sourceResult.listCells(); 248 for (Cell cell : sourceCells) { 249 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 250 cell.getValueOffset(), cell.getValueLength())); 251 } 252 } 253 } 254 } 255 256}