001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.nio.charset.Charset;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.backup.BackupAdmin;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
034import org.apache.hadoop.hbase.backup.BackupRequest;
035import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
036import org.apache.hadoop.hbase.backup.BackupType;
037import org.apache.hadoop.hbase.backup.RestoreRequest;
038import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
039import org.apache.hadoop.hbase.backup.impl.BackupManager;
040import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
041import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
042import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
043import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
044import org.apache.hadoop.hbase.chaos.policies.Policy;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.testclassification.IntegrationTests;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.util.ToolRunner;
055import org.junit.After;
056import org.junit.Assert;
057import org.junit.Before;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
067
068/**
069 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load
070 * data, perform series backup/load operations, then restore and verify data
071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
072 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
073 */
074@Category(IntegrationTests.class)
075public class IntegrationTestBackupRestore extends IntegrationTestBase {
076  private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
077  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
078  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
079  protected static final String COLUMN_NAME = "f";
080  protected static final String REGION_COUNT_KEY = "regions_per_rs";
081  protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
082  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
083  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
084  protected static final int DEFAULT_REGION_COUNT = 10;
085  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
086  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
087  protected static final int DEFAULT_NUM_ITERATIONS = 10;
088  protected static final int DEFAULT_ROWS_IN_ITERATION = 500000;
089  protected static final String SLEEP_TIME_KEY = "sleeptime";
090  // short default interval because tests don't run very long.
091  protected static final long SLEEP_TIME_DEFAULT = 50000L;
092
093  protected static int rowsInIteration;
094  protected static int regionsCountPerServer;
095  protected static int regionServerCount;
096
097  protected static int numIterations;
098  protected static int numTables;
099  protected static TableName[] tableNames;
100  protected long sleepTime;
101  protected static Object lock = new Object();
102
103  private static String BACKUP_ROOT_DIR = "backupIT";
104
105  @Override
106  @Before
107  public void setUp() throws Exception {
108    util = new IntegrationTestingUtility();
109    Configuration conf = util.getConfiguration();
110    regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
111    regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
112    rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
113    numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
114    numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
115    sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
116    enableBackup(conf);
117    LOG.info("Initializing cluster with {} region servers.", regionServerCount);
118    util.initializeCluster(regionServerCount);
119    LOG.info("Cluster initialized and ready");
120  }
121
122  @After
123  public void tearDown() throws IOException {
124    LOG.info("Cleaning up after test.");
125    if (util.isDistributedCluster()) {
126      deleteTablesIfAny();
127      LOG.info("Cleaning up after test. Deleted tables");
128      cleanUpBackupDir();
129    }
130    LOG.info("Restoring cluster.");
131    util.restoreCluster();
132    LOG.info("Cluster restored.");
133  }
134
135  @Override
136  public void setUpMonkey() throws Exception {
137    Policy p =
138      new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime));
139    this.monkey = new PolicyBasedChaosMonkey(util, p);
140    startMonkey();
141  }
142
143  private void deleteTablesIfAny() throws IOException {
144    for (TableName table : tableNames) {
145      util.deleteTableIfAny(table);
146    }
147  }
148
149  private void createTables() throws Exception {
150    tableNames = new TableName[numTables];
151    for (int i = 0; i < numTables; i++) {
152      tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
153    }
154    for (TableName table : tableNames) {
155      createTable(table);
156    }
157  }
158
159  private void enableBackup(Configuration conf) {
160    // Enable backup
161    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
162    BackupManager.decorateMasterConfiguration(conf);
163    BackupManager.decorateRegionServerConfiguration(conf);
164  }
165
166  private void cleanUpBackupDir() throws IOException {
167    FileSystem fs = FileSystem.get(util.getConfiguration());
168    fs.delete(new Path(BACKUP_ROOT_DIR), true);
169  }
170
171  @Test
172  public void testBackupRestore() throws Exception {
173    BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
174    createTables();
175    runTestMulti();
176  }
177
178  private void runTestMulti() throws IOException {
179    LOG.info("IT backup & restore started");
180    Thread[] workers = new Thread[numTables];
181    for (int i = 0; i < numTables; i++) {
182      final TableName table = tableNames[i];
183      Runnable r = new Runnable() {
184        @Override
185        public void run() {
186          try {
187            runTestSingle(table);
188          } catch (IOException e) {
189            LOG.error("Failed", e);
190            Assert.fail(e.getMessage());
191          }
192        }
193      };
194      workers[i] = new Thread(r);
195      workers[i].start();
196    }
197    // Wait all workers to finish
198    for (Thread t : workers) {
199      Uninterruptibles.joinUninterruptibly(t);
200    }
201    LOG.info("IT backup & restore finished");
202  }
203
204  private void createTable(TableName tableName) throws Exception {
205    long startTime, endTime;
206
207    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
208
209    TableDescriptor desc = builder.build();
210    ColumnFamilyDescriptorBuilder cbuilder =
211      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
212    ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
213    LOG.info("Creating table {} with {} splits.", tableName,
214      regionsCountPerServer * regionServerCount);
215    startTime = EnvironmentEdgeManager.currentTime();
216    createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer);
217    util.waitTableAvailable(tableName);
218    endTime = EnvironmentEdgeManager.currentTime();
219    LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
220  }
221
222  private void loadData(TableName table, int numRows) throws IOException {
223    Connection conn = util.getConnection();
224    // #0- insert some data to a table
225    Table t1 = conn.getTable(table);
226    util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows);
227    // flush table
228    conn.getAdmin().flush(TableName.valueOf(table.getName()));
229  }
230
231  private String backup(BackupRequest request, BackupAdmin client) throws IOException {
232    String backupId = client.backupTables(request);
233    return backupId;
234  }
235
236  private void restore(RestoreRequest request, BackupAdmin client) throws IOException {
237    client.restore(request);
238  }
239
240  private void merge(String[] backupIds, BackupAdmin client) throws IOException {
241    client.mergeBackups(backupIds);
242  }
243
244  private void runTestSingle(TableName table) throws IOException {
245
246    List<String> backupIds = new ArrayList<String>();
247
248    try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin();
249      BackupAdmin client = new BackupAdminImpl(conn);) {
250
251      // #0- insert some data to table 'table'
252      loadData(table, rowsInIteration);
253
254      // #1 - create full backup for table first
255      LOG.info("create full backup image for {}", table);
256      List<TableName> tables = Lists.newArrayList(table);
257      BackupRequest.Builder builder = new BackupRequest.Builder();
258      BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
259        .withTargetRootDir(BACKUP_ROOT_DIR).build();
260
261      String backupIdFull = backup(request, client);
262      assertTrue(checkSucceeded(backupIdFull));
263
264      backupIds.add(backupIdFull);
265      // Now continue with incremental backups
266      int count = 1;
267      while (count++ < numIterations) {
268
269        // Load data
270        loadData(table, rowsInIteration);
271        // Do incremental backup
272        builder = new BackupRequest.Builder();
273        request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
274          .withTargetRootDir(BACKUP_ROOT_DIR).build();
275        String backupId = backup(request, client);
276        assertTrue(checkSucceeded(backupId));
277        backupIds.add(backupId);
278
279        // Restore incremental backup for table, with overwrite for previous backup
280        String previousBackupId = backupIds.get(backupIds.size() - 2);
281        restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
282        // Restore incremental backup for table, with overwrite for last backup
283        restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
284      }
285      // Now merge all incremental and restore
286      String[] incBackupIds = allIncremental(backupIds);
287      merge(incBackupIds, client);
288      // Restore last one
289      String backupId = incBackupIds[incBackupIds.length - 1];
290      // restore incremental backup for table, with overwrite
291      TableName[] tablesRestoreIncMultiple = new TableName[] { table };
292      restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
293        true), client);
294      Table hTable = conn.getTable(table);
295      Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
296      hTable.close();
297      LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1));
298    }
299  }
300
301  private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
302    String backupId, long expectedRows) throws IOException {
303
304    TableName[] tablesRestoreIncMultiple = new TableName[] { table };
305    restore(
306      createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true),
307      client);
308    Table hTable = conn.getTable(table);
309    Assert.assertEquals(expectedRows, util.countRows(hTable));
310    hTable.close();
311  }
312
313  private String[] allIncremental(List<String> backupIds) {
314    int size = backupIds.size();
315    backupIds = backupIds.subList(1, size);
316    String[] arr = new String[size - 1];
317    backupIds.toArray(arr);
318    return arr;
319  }
320
321  /** Returns status of backup */
322  protected boolean checkSucceeded(String backupId) throws IOException {
323    BackupInfo status = getBackupInfo(backupId);
324    if (status == null) {
325      return false;
326    }
327    return status.getState() == BackupState.COMPLETE;
328  }
329
330  private BackupInfo getBackupInfo(String backupId) throws IOException {
331    try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) {
332      return table.readBackupInfo(backupId);
333    }
334  }
335
336  /**
337   * Get restore request.
338   * @param backupRootDir directory where backup is located
339   * @param backupId      backup ID
340   * @param check         check the backup
341   * @param fromTables    table names to restore from
342   * @param toTables      new table names to restore to
343   * @param isOverwrite   overwrite the table(s)
344   * @return an instance of RestoreRequest
345   */
346  public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check,
347    TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
348    RestoreRequest.Builder builder = new RestoreRequest.Builder();
349    return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
350      .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
351  }
352
353  @Override
354  public void setUpCluster() throws Exception {
355    util = getTestingUtil(getConf());
356    enableBackup(getConf());
357    LOG.debug("Initializing/checking cluster has {} servers", regionServerCount);
358    util.initializeCluster(regionServerCount);
359    LOG.debug("Done initializing/checking cluster");
360  }
361
362  /** Returns status of CLI execution */
363  @Override
364  public int runTestFromCommandLine() throws Exception {
365    // Check if backup is enabled
366    if (!BackupManager.isBackupEnabled(getConf())) {
367      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
368      return -1;
369    }
370    System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
371    testBackupRestore();
372    return 0;
373  }
374
375  @Override
376  public TableName getTablename() {
377    // That is only valid when Monkey is CALM (no monkey)
378    return null;
379  }
380
381  @Override
382  protected Set<String> getColumnFamilies() {
383    // That is only valid when Monkey is CALM (no monkey)
384    return null;
385  }
386
387  @Override
388  protected void addOptions() {
389    addOptWithArg(REGIONSERVER_COUNT_KEY,
390      "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
391    addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
392    addOptWithArg(ROWS_PER_ITERATION_KEY,
393      "Total number of data rows to be loaded during one iteration." + " Default: "
394        + DEFAULT_ROWS_IN_ITERATION);
395    addOptWithArg(NUM_ITERATIONS_KEY,
396      "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
397    addOptWithArg(NUMBER_OF_TABLES_KEY,
398      "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
399    addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms "
400      + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
401  }
402
403  @Override
404  protected void processOptions(CommandLine cmd) {
405    super.processOptions(cmd);
406    regionsCountPerServer = Integer
407      .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT)));
408    regionServerCount = Integer.parseInt(
409      cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
410    rowsInIteration = Integer.parseInt(
411      cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
412    numIterations = Integer
413      .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS)));
414    numTables = Integer.parseInt(
415      cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
416    sleepTime =
417      Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT)));
418
419    LOG.info(MoreObjects.toStringHelper("Parsed Options")
420      .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount)
421      .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations)
422      .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString());
423  }
424
425  public static void main(String[] args) throws Exception {
426    Configuration conf = HBaseConfiguration.create();
427    IntegrationTestingUtility.setUseDistributedCluster(conf);
428    int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args);
429    System.exit(status);
430  }
431}