001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
021import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
022import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
023import static org.apache.hadoop.hbase.backup.BackupType.FULL;
024import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.time.Instant;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
053import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.junit.After;
056import org.junit.AfterClass;
057import org.junit.Before;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067@Category(MediumTests.class)
068@RunWith(Parameterized.class)
069public class TestBackupRestoreOnEmptyEnvironment {
070
071  private static final Logger LOG =
072    LoggerFactory.getLogger(TestBackupRestoreOnEmptyEnvironment.class);
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestBackupRestoreOnEmptyEnvironment.class);
077
078  @Parameterized.Parameters(name = "{index}: restoreToOtherTable={0}")
079  public static Iterable<Object[]> data() {
080    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
081  }
082
083  @Parameterized.Parameter(0)
084  public boolean restoreToOtherTable;
085  private TableName sourceTable;
086  private TableName targetTable;
087
088  private static TestingHBaseCluster cluster;
089  private static Path BACKUP_ROOT_DIR;
090  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
091
092  @BeforeClass
093  public static void beforeClass() throws Exception {
094    Configuration conf = HBaseConfiguration.create();
095    enableBackup(conf);
096    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
097    cluster.start();
098    BACKUP_ROOT_DIR = new Path(new Path(conf.get("fs.defaultFS")), new Path("/backupIT"));
099  }
100
101  @AfterClass
102  public static void afterClass() throws Exception {
103    cluster.stop();
104  }
105
106  @Before
107  public void setUp() throws Exception {
108    sourceTable = TableName.valueOf("table");
109    targetTable = TableName.valueOf("another-table");
110    createTable(sourceTable);
111    createTable(targetTable);
112  }
113
114  @After
115  public void removeTables() throws Exception {
116    deleteTables();
117  }
118
119  @Test
120  public void testRestoreToCorrectTable() throws Exception {
121    Instant timestamp = Instant.now().minusSeconds(10);
122
123    // load some data
124    putLoad(sourceTable, timestamp, "data");
125
126    String backupId = backup(FULL, Collections.singletonList(sourceTable));
127    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
128    assertTrue(backupInfo.getTables().contains(sourceTable));
129
130    LOG.info("Deleting the tables before restore ...");
131    deleteTables();
132
133    if (restoreToOtherTable) {
134      restore(backupId, sourceTable, targetTable);
135      validateDataEquals(targetTable, "data");
136    } else {
137      restore(backupId, sourceTable, sourceTable);
138      validateDataEquals(sourceTable, "data");
139    }
140
141  }
142
143  @Test
144  public void testRestoreCorrectTableForIncremental() throws Exception {
145    Instant timestamp = Instant.now().minusSeconds(10);
146
147    // load some data
148    putLoad(sourceTable, timestamp, "data");
149
150    String backupId = backup(FULL, Collections.singletonList(sourceTable));
151    verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
152
153    // some incremental data
154    putLoad(sourceTable, timestamp.plusMillis(1), "new_data");
155
156    String backupId2 = backup(INCREMENTAL, Collections.singletonList(sourceTable));
157    verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE);
158
159    LOG.info("Deleting the tables before restore ...");
160    deleteTables();
161
162    if (restoreToOtherTable) {
163      restore(backupId2, sourceTable, targetTable);
164      validateDataEquals(targetTable, "new_data");
165    } else {
166      restore(backupId2, sourceTable, sourceTable);
167      validateDataEquals(sourceTable, "new_data");
168    }
169
170  }
171
172  private void createTable(TableName tableName) throws IOException {
173    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
174      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
175    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
176      Admin admin = connection.getAdmin()) {
177      admin.createTable(builder.build());
178    }
179  }
180
181  private void deleteTables() throws IOException {
182    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
183      Admin admin = connection.getAdmin()) {
184      for (TableName table : Arrays.asList(sourceTable, targetTable)) {
185        if (admin.tableExists(table)) {
186          admin.disableTable(table);
187          admin.deleteTable(table);
188        }
189      }
190    }
191  }
192
193  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
194    LOG.info("Writing new data to HBase using normal Puts: {}", data);
195    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
196      Table table = connection.getTable(sourceTable);
197      List<Put> puts = new ArrayList<>();
198      for (int i = 0; i < 10; i++) {
199        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
200        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
201        puts.add(put);
202
203        if (i % 100 == 0) {
204          table.put(puts);
205          puts.clear();
206        }
207      }
208      if (!puts.isEmpty()) {
209        table.put(puts);
210      }
211      connection.getAdmin().flush(tableName);
212    }
213  }
214
215  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
216    LOG.info("Creating the backup ...");
217
218    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
219      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
220      BackupRequest backupRequest =
221        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
222          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
223      return backupAdmin.backupTables(backupRequest);
224    }
225
226  }
227
228  private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
229    throws IOException {
230    LOG.info("Restoring data ...");
231    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
232      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
233      RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
234        .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true)
235        .withFromTables(new TableName[] { sourceTableName })
236        .withToTables(new TableName[] { targetTableName }).build();
237      backupAdmin.restore(restoreRequest);
238    }
239  }
240
241  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
242    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
243      Table table = connection.getTable(tableName)) {
244      Scan scan = new Scan();
245      scan.setRaw(true);
246      scan.setBatch(100);
247
248      for (Result sourceResult : table.getScanner(scan)) {
249        List<Cell> sourceCells = sourceResult.listCells();
250        for (Cell cell : sourceCells) {
251          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
252            cell.getValueOffset(), cell.getValueLength()));
253        }
254      }
255    }
256  }
257}