001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.List; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 028import org.apache.hadoop.hbase.Waiter; 029import org.apache.hadoop.hbase.client.Get; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.junit.runners.Parameterized.Parameter; 045import org.junit.runners.Parameterized.Parameters; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 050 051/** 052 * Test handling of changes to the number of a peer's regionservers. 053 */ 054@RunWith(Parameterized.class) 055@Category({ ReplicationTests.class, LargeTests.class }) 056public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class); 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 064 065 @SuppressWarnings("checkstyle:VisibilityModifier") 066 @Parameter(0) 067 public boolean serialPeer; 068 069 @Parameter(1) 070 public boolean syncPeer; 071 072 @Override 073 protected boolean isSerialPeer() { 074 return serialPeer; 075 } 076 077 @Override 078 protected boolean isSyncPeer() { 079 return syncPeer; 080 } 081 082 @Parameters(name = "{index}: serialPeer={0}, syncPeer={1}") 083 public static List<Object[]> parameters() { 084 return ImmutableList.of(new Object[] { false, false }, new Object[] { false, true }, 085 new Object[] { true, false }, new Object[] { true, true }); 086 } 087 088 @Before 089 public void setUp() throws Exception { 090 // Starting and stopping replication can make us miss new logs, 091 // rolling like this makes sure the most recent one gets added to the queue 092 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 093 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 094 } 095 UTIL1.deleteTableData(tableName); 096 // truncating the table will send one Delete per row to the slave cluster 097 // in an async fashion, which is why we cannot just call deleteTableData on 098 // utility2 since late writes could make it to the slave in some way. 099 // Instead, we truncate the first table and wait for all the Deletes to 100 // make it to the slave. 101 Scan scan = new Scan(); 102 int lastCount = 0; 103 for (int i = 0; i < NB_RETRIES; i++) { 104 if (i == NB_RETRIES - 1) { 105 fail("Waited too much time for truncate"); 106 } 107 ResultScanner scanner = htable2.getScanner(scan); 108 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 109 scanner.close(); 110 if (res.length != 0) { 111 if (res.length < lastCount) { 112 i--; // Don't increment timeout if we make progress 113 } 114 lastCount = res.length; 115 LOG.info("Still got " + res.length + " rows"); 116 Thread.sleep(SLEEP_TIME); 117 } else { 118 break; 119 } 120 } 121 } 122 123 @Test 124 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 125 LOG.info("testSimplePutDelete"); 126 SingleProcessHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); 127 // This test wants two RS's up. We only run one generally so add one. 128 peerCluster.startRegionServer(); 129 Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() { 130 @Override 131 public boolean evaluate() throws Exception { 132 return peerCluster.getLiveRegionServerThreads().size() > 1; 133 } 134 }); 135 int numRS = peerCluster.getRegionServerThreads().size(); 136 137 doPutTest(Bytes.toBytes(1)); 138 139 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 140 peerCluster.stopRegionServer(rsToStop); 141 peerCluster.waitOnRegionServer(rsToStop); 142 143 // Sanity check 144 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 145 146 doPutTest(Bytes.toBytes(2)); 147 148 peerCluster.startRegionServer(); 149 150 // Sanity check 151 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 152 153 doPutTest(Bytes.toBytes(3)); 154 } 155 156 private void doPutTest(byte[] row) throws IOException, InterruptedException { 157 Put put = new Put(row); 158 put.addColumn(famName, row, row); 159 160 if (htable1 == null) { 161 htable1 = UTIL1.getConnection().getTable(tableName); 162 } 163 164 htable1.put(put); 165 166 Get get = new Get(row); 167 for (int i = 0; i < NB_RETRIES; i++) { 168 if (i == NB_RETRIES - 1) { 169 fail("Waited too much time for put replication"); 170 } 171 Result res = htable2.get(get); 172 if (res.isEmpty()) { 173 LOG.info("Row not available"); 174 Thread.sleep(SLEEP_TIME); 175 } else { 176 assertArrayEquals(res.value(), row); 177 break; 178 } 179 } 180 } 181}