001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.nio.charset.Charset;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.backup.BackupAdmin;
031import org.apache.hadoop.hbase.backup.BackupInfo;
032import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
033import org.apache.hadoop.hbase.backup.BackupRequest;
034import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
035import org.apache.hadoop.hbase.backup.BackupType;
036import org.apache.hadoop.hbase.backup.RestoreRequest;
037import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
038import org.apache.hadoop.hbase.backup.impl.BackupManager;
039import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
040import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
041import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
042import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
043import org.apache.hadoop.hbase.chaos.policies.Policy;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.testclassification.IntegrationTests;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.util.ToolRunner;
054import org.junit.After;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
063import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
065import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
066
067/**
068 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load
069 * data, perform series backup/load operations, then restore and verify data
070 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
072 */
073@Category(IntegrationTests.class)
074public class IntegrationTestBackupRestore extends IntegrationTestBase {
075  private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
076  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
077  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
078  protected static final String COLUMN_NAME = "f";
079  protected static final String REGION_COUNT_KEY = "regions_per_rs";
080  protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
081  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
082  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
083  protected static final int DEFAULT_REGION_COUNT = 10;
084  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
085  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
086  protected static final int DEFAULT_NUM_ITERATIONS = 10;
087  protected static final int DEFAULT_ROWS_IN_ITERATION = 500000;
088  protected static final String SLEEP_TIME_KEY = "sleeptime";
089  // short default interval because tests don't run very long.
090  protected static final long SLEEP_TIME_DEFAULT = 50000L;
091
092  protected static int rowsInIteration;
093  protected static int regionsCountPerServer;
094  protected static int regionServerCount;
095
096  protected static int numIterations;
097  protected static int numTables;
098  protected static TableName[] tableNames;
099  protected long sleepTime;
100  protected static Object lock = new Object();
101
102  private static String BACKUP_ROOT_DIR = "backupIT";
103
104  @Override
105  @Before
106  public void setUp() throws Exception {
107    util = new IntegrationTestingUtility();
108    Configuration conf = util.getConfiguration();
109    regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
110    regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
111    rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
112    numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
113    numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
114    sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
115    enableBackup(conf);
116    LOG.info("Initializing cluster with {} region servers.", regionServerCount);
117    util.initializeCluster(regionServerCount);
118    LOG.info("Cluster initialized and ready");
119  }
120
121  @After
122  public void tearDown() throws IOException {
123    LOG.info("Cleaning up after test.");
124    if (util.isDistributedCluster()) {
125      deleteTablesIfAny();
126      LOG.info("Cleaning up after test. Deleted tables");
127      cleanUpBackupDir();
128    }
129    LOG.info("Restoring cluster.");
130    util.restoreCluster();
131    LOG.info("Cluster restored.");
132  }
133
134  @Override
135  public void setUpMonkey() throws Exception {
136    Policy p =
137      new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime));
138    this.monkey = new PolicyBasedChaosMonkey(util, p);
139    startMonkey();
140  }
141
142  private void deleteTablesIfAny() throws IOException {
143    for (TableName table : tableNames) {
144      util.deleteTableIfAny(table);
145    }
146  }
147
148  private void createTables() throws Exception {
149    tableNames = new TableName[numTables];
150    for (int i = 0; i < numTables; i++) {
151      tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
152    }
153    for (TableName table : tableNames) {
154      createTable(table);
155    }
156  }
157
158  private void enableBackup(Configuration conf) {
159    // Enable backup
160    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
161    BackupManager.decorateMasterConfiguration(conf);
162    BackupManager.decorateRegionServerConfiguration(conf);
163  }
164
165  private void cleanUpBackupDir() throws IOException {
166    FileSystem fs = FileSystem.get(util.getConfiguration());
167    fs.delete(new Path(BACKUP_ROOT_DIR), true);
168  }
169
170  @Test
171  public void testBackupRestore() throws Exception {
172    BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
173    createTables();
174    runTestMulti();
175  }
176
177  private void runTestMulti() throws IOException {
178    LOG.info("IT backup & restore started");
179    Thread[] workers = new Thread[numTables];
180    for (int i = 0; i < numTables; i++) {
181      final TableName table = tableNames[i];
182      Runnable r = new Runnable() {
183        @Override
184        public void run() {
185          try {
186            runTestSingle(table);
187          } catch (IOException e) {
188            LOG.error("Failed", e);
189            Assert.fail(e.getMessage());
190          }
191        }
192      };
193      workers[i] = new Thread(r);
194      workers[i].start();
195    }
196    // Wait all workers to finish
197    for (Thread t : workers) {
198      Uninterruptibles.joinUninterruptibly(t);
199    }
200    LOG.info("IT backup & restore finished");
201  }
202
203  private void createTable(TableName tableName) throws Exception {
204    long startTime, endTime;
205
206    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
207
208    TableDescriptor desc = builder.build();
209    ColumnFamilyDescriptorBuilder cbuilder =
210      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
211    ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
212    LOG.info("Creating table {} with {} splits.", tableName,
213      regionsCountPerServer * regionServerCount);
214    startTime = EnvironmentEdgeManager.currentTime();
215    HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
216      regionsCountPerServer);
217    util.waitTableAvailable(tableName);
218    endTime = EnvironmentEdgeManager.currentTime();
219    LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
220  }
221
222  private void loadData(TableName table, int numRows) throws IOException {
223    Connection conn = util.getConnection();
224    // #0- insert some data to a table
225    Table t1 = conn.getTable(table);
226    util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows);
227    // flush table
228    conn.getAdmin().flush(TableName.valueOf(table.getName()));
229  }
230
231  private String backup(BackupRequest request, BackupAdmin client) throws IOException {
232    String backupId = client.backupTables(request);
233    return backupId;
234  }
235
236  private void restore(RestoreRequest request, BackupAdmin client) throws IOException {
237    client.restore(request);
238  }
239
240  private void merge(String[] backupIds, BackupAdmin client) throws IOException {
241    client.mergeBackups(backupIds);
242  }
243
244  private void runTestSingle(TableName table) throws IOException {
245
246    List<String> backupIds = new ArrayList<String>();
247
248    try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin();
249      BackupAdmin client = new BackupAdminImpl(conn);) {
250
251      // #0- insert some data to table 'table'
252      loadData(table, rowsInIteration);
253
254      // #1 - create full backup for table first
255      LOG.info("create full backup image for {}", table);
256      List<TableName> tables = Lists.newArrayList(table);
257      BackupRequest.Builder builder = new BackupRequest.Builder();
258      BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
259        .withTargetRootDir(BACKUP_ROOT_DIR).build();
260
261      String backupIdFull = backup(request, client);
262      assertTrue(checkSucceeded(backupIdFull));
263
264      backupIds.add(backupIdFull);
265      // Now continue with incremental backups
266      int count = 1;
267      while (count++ < numIterations) {
268
269        // Load data
270        loadData(table, rowsInIteration);
271        // Do incremental backup
272        builder = new BackupRequest.Builder();
273        request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
274          .withTargetRootDir(BACKUP_ROOT_DIR).build();
275        String backupId = backup(request, client);
276        assertTrue(checkSucceeded(backupId));
277        backupIds.add(backupId);
278
279        // Restore incremental backup for table, with overwrite for previous backup
280        String previousBackupId = backupIds.get(backupIds.size() - 2);
281        restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
282        // Restore incremental backup for table, with overwrite for last backup
283        restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
284      }
285      // Now merge all incremental and restore
286      String[] incBackupIds = allIncremental(backupIds);
287      merge(incBackupIds, client);
288      // Restore last one
289      String backupId = incBackupIds[incBackupIds.length - 1];
290      // restore incremental backup for table, with overwrite
291      TableName[] tablesRestoreIncMultiple = new TableName[] { table };
292      restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
293        true), client);
294      Table hTable = conn.getTable(table);
295      Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
296      hTable.close();
297      LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1));
298    }
299  }
300
301  private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
302    String backupId, long expectedRows) throws IOException {
303
304    TableName[] tablesRestoreIncMultiple = new TableName[] { table };
305    restore(
306      createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true),
307      client);
308    Table hTable = conn.getTable(table);
309    Assert.assertEquals(expectedRows, util.countRows(hTable));
310    hTable.close();
311  }
312
313  private String[] allIncremental(List<String> backupIds) {
314    int size = backupIds.size();
315    backupIds = backupIds.subList(1, size);
316    String[] arr = new String[size - 1];
317    backupIds.toArray(arr);
318    return arr;
319  }
320
321  /**
322   * Check if backup is succeeded
323   * @param backupId pass backup ID to check status of
324   * @return status of backup
325   */
326  protected boolean checkSucceeded(String backupId) throws IOException {
327    BackupInfo status = getBackupInfo(backupId);
328    if (status == null) {
329      return false;
330    }
331    return status.getState() == BackupState.COMPLETE;
332  }
333
334  private BackupInfo getBackupInfo(String backupId) throws IOException {
335    try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) {
336      return table.readBackupInfo(backupId);
337    }
338  }
339
340  /**
341   * Get restore request.
342   * @param backupRootDir directory where backup is located
343   * @param backupId      backup ID
344   * @param check         check the backup
345   * @param fromTables    table names to restore from
346   * @param toTables      new table names to restore to
347   * @param isOverwrite   overwrite the table(s)
348   * @return an instance of RestoreRequest
349   */
350  public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check,
351    TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
352    RestoreRequest.Builder builder = new RestoreRequest.Builder();
353    return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
354      .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
355  }
356
357  @Override
358  public void setUpCluster() throws Exception {
359    util = getTestingUtil(getConf());
360    enableBackup(getConf());
361    LOG.debug("Initializing/checking cluster has {} servers", regionServerCount);
362    util.initializeCluster(regionServerCount);
363    LOG.debug("Done initializing/checking cluster");
364  }
365
366  /** Returns status of CLI execution */
367  @Override
368  public int runTestFromCommandLine() throws Exception {
369    // Check if backup is enabled
370    if (!BackupManager.isBackupEnabled(getConf())) {
371      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
372      return -1;
373    }
374    System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
375    testBackupRestore();
376    return 0;
377  }
378
379  @Override
380  public TableName getTablename() {
381    // That is only valid when Monkey is CALM (no monkey)
382    return null;
383  }
384
385  @Override
386  protected Set<String> getColumnFamilies() {
387    // That is only valid when Monkey is CALM (no monkey)
388    return null;
389  }
390
391  @Override
392  protected void addOptions() {
393    addOptWithArg(REGIONSERVER_COUNT_KEY,
394      "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
395    addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
396    addOptWithArg(ROWS_PER_ITERATION_KEY,
397      "Total number of data rows to be loaded during one iteration." + " Default: "
398        + DEFAULT_ROWS_IN_ITERATION);
399    addOptWithArg(NUM_ITERATIONS_KEY,
400      "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
401    addOptWithArg(NUMBER_OF_TABLES_KEY,
402      "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
403    addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms "
404      + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
405  }
406
407  @Override
408  protected void processOptions(CommandLine cmd) {
409    super.processOptions(cmd);
410    regionsCountPerServer = Integer
411      .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT)));
412    regionServerCount = Integer.parseInt(
413      cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
414    rowsInIteration = Integer.parseInt(
415      cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
416    numIterations = Integer
417      .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS)));
418    numTables = Integer.parseInt(
419      cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
420    sleepTime =
421      Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT)));
422
423    LOG.info(MoreObjects.toStringHelper("Parsed Options")
424      .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount)
425      .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations)
426      .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString());
427  }
428
429  /**
430   * Main method
431   * @param args argument list
432   */
433  public static void main(String[] args) throws Exception {
434    Configuration conf = HBaseConfiguration.create();
435    IntegrationTestingUtility.setUseDistributedCluster(conf);
436    int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args);
437    System.exit(status);
438  }
439}