001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.fail;
021
022import org.apache.hadoop.hbase.HBaseTestingUtil;
023import org.apache.hadoop.hbase.UnknownScannerException;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.ConnectionFactory;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034public abstract class TestReplicationKillRS extends TestReplicationBase {
035
036  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationKillRS.class);
037
038  /**
039   * Load up 1 tables over 2 region servers and kill a source during the upload. The failover
040   * happens internally. WARNING this test sometimes fails because of HBASE-3515
041   */
042  protected void loadTableAndKillRS(HBaseTestingUtil util) throws Exception {
043    // killing the RS with hbase:meta can result into failed puts until we solve
044    // IO fencing
045    int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
046
047    // Takes about 20 secs to run the full loading, kill around the middle
048    Thread killer = killARegionServer(util, 5000, rsToKill1);
049    Result[] res;
050    int initialCount;
051    try (Connection conn = ConnectionFactory.createConnection(CONF1)) {
052      try (Table table = conn.getTable(tableName)) {
053        LOG.info("Start loading table");
054        initialCount = UTIL1.loadTable(table, famName);
055        LOG.info("Done loading table");
056        killer.join(5000);
057        LOG.info("Done waiting for threads");
058
059        while (true) {
060          try (ResultScanner scanner = table.getScanner(new Scan())) {
061            res = scanner.next(initialCount);
062            break;
063          } catch (UnknownScannerException ex) {
064            LOG.info("Cluster wasn't ready yet, restarting scanner");
065          }
066        }
067      }
068    }
069    // Test we actually have all the rows, we may miss some because we
070    // don't have IO fencing.
071    if (res.length != initialCount) {
072      LOG.warn("We lost some rows on the master cluster!");
073      // We don't really expect the other cluster to have more rows
074      initialCount = res.length;
075    }
076
077    int lastCount = 0;
078    final long start = EnvironmentEdgeManager.currentTime();
079    int i = 0;
080    try (Connection conn = ConnectionFactory.createConnection(CONF2)) {
081      try (Table table = conn.getTable(tableName)) {
082        while (true) {
083          if (i == NB_RETRIES - 1) {
084            fail("Waited too much time for queueFailover replication. " + "Waited "
085              + (EnvironmentEdgeManager.currentTime() - start) + "ms.");
086          }
087          Result[] res2;
088          try (ResultScanner scanner = table.getScanner(new Scan())) {
089            res2 = scanner.next(initialCount * 2);
090          }
091          if (res2.length < initialCount) {
092            if (lastCount < res2.length) {
093              i--; // Don't increment timeout if we make progress
094            } else {
095              i++;
096            }
097            lastCount = res2.length;
098            LOG.info(
099              "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i);
100            Thread.sleep(SLEEP_TIME * 2);
101          } else {
102            break;
103          }
104        }
105      }
106    }
107  }
108
109  private static Thread killARegionServer(final HBaseTestingUtil utility, final long timeout,
110    final int rs) {
111    Thread killer = new Thread() {
112      @Override
113      public void run() {
114        try {
115          Thread.sleep(timeout);
116          utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
117        } catch (Exception e) {
118          LOG.error("Couldn't kill a region server", e);
119        }
120      }
121    };
122    killer.setDaemon(true);
123    killer.start();
124    return killer;
125  }
126}