001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; 021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; 022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; 023import static org.apache.hadoop.hbase.backup.BackupType.FULL; 024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertTrue; 028 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.time.Instant; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.Map; 036import java.util.UUID; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.io.hfile.HFile; 057import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.testing.TestingHBaseCluster; 060import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; 061import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@Category(MediumTests.class) 075@RunWith(Parameterized.class) 076public class TestIncrementalBackupMergeWithBulkLoad { 077 078 private static final Logger LOG = 079 LoggerFactory.getLogger(TestIncrementalBackupMergeWithBulkLoad.class); 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithBulkLoad.class); 084 085 @Parameterized.Parameters(name = "{index}: useBulkLoad={0}") 086 public static Iterable<Object[]> data() { 087 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED; 088 } 089 090 @Parameterized.Parameter(0) 091 public boolean useBulkLoad; 092 093 private TableName sourceTable; 094 private TableName targetTable; 095 096 private List<TableName> allTables; 097 private static TestingHBaseCluster cluster; 098 private static final Path BACKUP_ROOT_DIR = new Path("backupIT"); 099 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); 100 101 @BeforeClass 102 public static void beforeClass() throws Exception { 103 Configuration conf = HBaseConfiguration.create(); 104 enableBackup(conf); 105 cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); 106 cluster.start(); 107 } 108 109 @AfterClass 110 public static void afterClass() throws Exception { 111 cluster.stop(); 112 } 113 114 @Before 115 public void setUp() throws Exception { 116 sourceTable = TableName.valueOf("table-" + useBulkLoad); 117 targetTable = TableName.valueOf("another-table-" + useBulkLoad); 118 allTables = Arrays.asList(sourceTable, targetTable); 119 createTable(sourceTable); 120 createTable(targetTable); 121 } 122 123 @Test 124 public void testMergeContainingBulkloadedHfiles() throws Exception { 125 Instant timestamp = Instant.now(); 126 127 String backupId = backup(FULL, allTables); 128 BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); 129 assertTrue(backupInfo.getTables().contains(sourceTable)); 130 131 // load some data 132 load(sourceTable, timestamp, "data"); 133 134 String backupId1 = backup(INCREMENTAL, allTables); 135 backupInfo = verifyBackup(cluster.getConf(), backupId1, INCREMENTAL, COMPLETE); 136 assertTrue(backupInfo.getTables().contains(sourceTable)); 137 138 String backupId2 = backup(INCREMENTAL, allTables); 139 backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 140 assertTrue(backupInfo.getTables().contains(sourceTable)); 141 142 merge(new String[] { backupId1, backupId2 }); 143 backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); 144 assertTrue(backupInfo.getTables().contains(sourceTable)); 145 validateDataEquals(sourceTable, "data"); 146 } 147 148 private void createTable(TableName tableName) throws IOException { 149 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) 150 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 151 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 152 Admin admin = connection.getAdmin()) { 153 admin.createTable(builder.build()); 154 } 155 } 156 157 private void load(TableName tableName, Instant timestamp, String data) throws IOException { 158 if (useBulkLoad) { 159 hFileBulkLoad(tableName, timestamp, data); 160 } else { 161 putLoad(tableName, timestamp, data); 162 } 163 } 164 165 private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { 166 LOG.info("Writing new data to HBase using normal Puts: {}", data); 167 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { 168 Table table = connection.getTable(sourceTable); 169 List<Put> puts = new ArrayList<>(); 170 for (int i = 0; i < 10; i++) { 171 Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); 172 put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); 173 puts.add(put); 174 175 if (i % 100 == 0) { 176 table.put(puts); 177 puts.clear(); 178 } 179 } 180 if (!puts.isEmpty()) { 181 table.put(puts); 182 } 183 connection.getAdmin().flush(tableName); 184 } 185 } 186 187 private void hFileBulkLoad(TableName tableName, Instant timestamp, String data) 188 throws IOException { 189 FileSystem fs = FileSystem.get(cluster.getConf()); 190 LOG.info("Writing new data to HBase using BulkLoad: {}", data); 191 // HFiles require this strict directory structure to allow to load them 192 Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID()); 193 fs.mkdirs(hFileRootPath); 194 Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY)); 195 fs.mkdirs(hFileFamilyPath); 196 try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf()) 197 .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID())) 198 .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes()) 199 .withColumnFamily(COLUMN_FAMILY).build()) 200 .create()) { 201 for (int i = 0; i < 10; i++) { 202 writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"), 203 timestamp.toEpochMilli(), Bytes.toBytes(data))); 204 } 205 } 206 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result = 207 BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath); 208 assertFalse(result.isEmpty()); 209 } 210 211 private String backup(BackupType backupType, List<TableName> tables) throws IOException { 212 LOG.info("Creating the backup ..."); 213 214 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 215 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 216 BackupRequest backupRequest = 217 new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) 218 .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); 219 return backupAdmin.backupTables(backupRequest); 220 } 221 } 222 223 private void merge(String[] backupIds) throws IOException { 224 LOG.info("Merging the backups ..."); 225 226 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 227 BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { 228 backupAdmin.mergeBackups(backupIds); 229 } 230 } 231 232 private void validateDataEquals(TableName tableName, String expectedData) throws IOException { 233 try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); 234 Table table = connection.getTable(tableName)) { 235 Scan scan = new Scan(); 236 scan.readAllVersions(); 237 scan.setRaw(true); 238 scan.setBatch(100); 239 240 for (Result sourceResult : table.getScanner(scan)) { 241 List<Cell> sourceCells = sourceResult.listCells(); 242 for (Cell cell : sourceCells) { 243 assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), 244 cell.getValueOffset(), cell.getValueLength())); 245 } 246 } 247 } 248 } 249 250}