001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.fail; 021 022import org.apache.hadoop.hbase.HBaseTestingUtility; 023import org.apache.hadoop.hbase.UnknownScannerException; 024import org.apache.hadoop.hbase.client.Connection; 025import org.apache.hadoop.hbase.client.ConnectionFactory; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.ResultScanner; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034public abstract class TestReplicationKillRS extends TestReplicationBase { 035 036 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationKillRS.class); 037 038 /** 039 * Load up 1 tables over 2 region servers and kill a source during the upload. The failover 040 * happens internally. WARNING this test sometimes fails because of HBASE-3515 041 */ 042 protected void loadTableAndKillRS(HBaseTestingUtility util) throws Exception { 043 // killing the RS with hbase:meta can result into failed puts until we solve 044 // IO fencing 045 int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0; 046 047 // Takes about 20 secs to run the full loading, kill around the middle 048 Thread killer = killARegionServer(util, 5000, rsToKill1); 049 Result[] res; 050 int initialCount; 051 try (Connection conn = ConnectionFactory.createConnection(CONF1)) { 052 try (Table table = conn.getTable(tableName)) { 053 LOG.info("Start loading table"); 054 initialCount = UTIL1.loadTable(table, famName); 055 LOG.info("Done loading table"); 056 killer.join(5000); 057 LOG.info("Done waiting for threads"); 058 059 while (true) { 060 try (ResultScanner scanner = table.getScanner(new Scan())) { 061 res = scanner.next(initialCount); 062 break; 063 } catch (UnknownScannerException ex) { 064 LOG.info("Cluster wasn't ready yet, restarting scanner"); 065 } 066 } 067 } 068 } 069 // Test we actually have all the rows, we may miss some because we 070 // don't have IO fencing. 071 if (res.length != initialCount) { 072 LOG.warn("We lost some rows on the master cluster!"); 073 // We don't really expect the other cluster to have more rows 074 initialCount = res.length; 075 } 076 077 int lastCount = 0; 078 final long start = EnvironmentEdgeManager.currentTime(); 079 int i = 0; 080 try (Connection conn = ConnectionFactory.createConnection(CONF2)) { 081 try (Table table = conn.getTable(tableName)) { 082 while (true) { 083 if (i == NB_RETRIES - 1) { 084 fail("Waited too much time for queueFailover replication. " + "Waited " 085 + (EnvironmentEdgeManager.currentTime() - start) + "ms."); 086 } 087 Result[] res2; 088 try (ResultScanner scanner = table.getScanner(new Scan())) { 089 res2 = scanner.next(initialCount * 2); 090 } 091 if (res2.length < initialCount) { 092 if (lastCount < res2.length) { 093 i--; // Don't increment timeout if we make progress 094 } else { 095 i++; 096 } 097 lastCount = res2.length; 098 LOG.info( 099 "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i); 100 Thread.sleep(SLEEP_TIME * 2); 101 } else { 102 break; 103 } 104 } 105 } 106 } 107 } 108 109 private static Thread killARegionServer(final HBaseTestingUtility utility, final long timeout, 110 final int rs) { 111 Thread killer = new Thread() { 112 @Override 113 public void run() { 114 try { 115 Thread.sleep(timeout); 116 utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test"); 117 } catch (Exception e) { 118 LOG.error("Couldn't kill a region server", e); 119 } 120 } 121 }; 122 killer.setDaemon(true); 123 killer.start(); 124 return killer; 125 } 126}