001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertNotNull; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Durability; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.regionserver.Region; 041import org.apache.hadoop.hbase.replication.ReplicationException; 042import org.apache.hadoop.hbase.testclassification.FlakeyTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 056 057/** 058 * Tests region replication by setting up region replicas and verifying async wal replication 059 * replays the edits to the secondary region in various scenarios. 060 */ 061@Category({ FlakeyTests.class, LargeTests.class }) 062public class TestRegionReplicaReplication { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestRegionReplicaReplication.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaReplication.class); 069 070 private static final int NB_SERVERS = 2; 071 072 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 073 074 @Rule 075 public TestName name = new TestName(); 076 077 @BeforeClass 078 public static void beforeClass() throws Exception { 079 Configuration conf = HTU.getConfiguration(); 080 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 081 conf.setInt("replication.source.size.capacity", 10240); 082 conf.setLong("replication.source.sleepforretries", 100); 083 conf.setInt("hbase.regionserver.maxlogs", 10); 084 conf.setLong("hbase.master.logcleaner.ttl", 10); 085 conf.setInt("zookeeper.recovery.retry", 1); 086 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 087 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 088 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 089 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 090 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 091 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 092 093 HTU.startMiniCluster(NB_SERVERS); 094 } 095 096 @AfterClass 097 public static void afterClass() throws Exception { 098 HTU.shutdownMiniCluster(); 099 } 100 101 private void testRegionReplicaReplication(int regionReplication, boolean skipWAL) 102 throws Exception { 103 // test region replica replication. Create a table with single region, write some data 104 // ensure that data is replicated to the secondary region 105 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" 106 + regionReplication + (skipWAL ? "_skipWAL" : "")); 107 TableDescriptorBuilder builder = 108 HTU 109 .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()), 110 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 111 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED) 112 .setRegionReplication(regionReplication); 113 if (skipWAL) { 114 builder.setDurability(Durability.SKIP_WAL); 115 } 116 TableDescriptor htd = builder.build(); 117 createOrEnableTableWithRetries(htd, true); 118 TableName tableNameNoReplicas = 119 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 120 HTU.deleteTableIfAny(tableNameNoReplicas); 121 HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1); 122 123 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 124 Table table = connection.getTable(tableName); 125 Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) { 126 // load some data to the non-replicated table 127 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000); 128 129 // load the data to the table 130 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000); 131 132 verifyReplication(tableName, regionReplication, 0, 1000); 133 } finally { 134 HTU.deleteTableIfAny(tableNameNoReplicas); 135 } 136 } 137 138 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 139 final int endRow) throws Exception { 140 verifyReplication(tableName, regionReplication, startRow, endRow, true); 141 } 142 143 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 144 final int endRow, final boolean present) throws Exception { 145 // find the regions 146 final Region[] regions = new Region[regionReplication]; 147 148 for (int i = 0; i < NB_SERVERS; i++) { 149 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 150 List<HRegion> onlineRegions = rs.getRegions(tableName); 151 for (HRegion region : onlineRegions) { 152 regions[region.getRegionInfo().getReplicaId()] = region; 153 } 154 } 155 156 for (Region region : regions) { 157 assertNotNull(region); 158 } 159 160 for (int i = 1; i < regionReplication; i++) { 161 final Region region = regions[i]; 162 // wait until all the data is replicated to all secondary regions 163 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 164 @Override 165 public boolean evaluate() throws Exception { 166 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 167 try { 168 HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present); 169 } catch (Throwable ex) { 170 LOG.warn("Verification from secondary region is not complete yet", ex); 171 // still wait 172 return false; 173 } 174 return true; 175 } 176 }); 177 } 178 } 179 180 @Test 181 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 182 testRegionReplicaReplication(2, false); 183 testRegionReplicaReplication(2, true); 184 } 185 186 @Test 187 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 188 testRegionReplicaReplication(3, false); 189 testRegionReplicaReplication(3, true); 190 } 191 192 @Test 193 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 194 testRegionReplicaReplication(10, false); 195 testRegionReplicaReplication(10, true); 196 } 197 198 @Test 199 public void testRegionReplicaWithoutMemstoreReplication() throws Exception { 200 int regionReplication = 3; 201 TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName()) 202 .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build(); 203 createOrEnableTableWithRetries(htd, true); 204 final TableName tableName = htd.getTableName(); 205 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 206 Table table = connection.getTable(tableName); 207 try { 208 // write data to the primary. The replicas should not receive the data 209 final int STEP = 100; 210 for (int i = 0; i < 3; ++i) { 211 final int startRow = i * STEP; 212 final int endRow = (i + 1) * STEP; 213 LOG.info("Writing data from " + startRow + " to " + endRow); 214 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow); 215 verifyReplication(tableName, regionReplication, startRow, endRow, false); 216 217 // Flush the table, now the data should show up in the replicas 218 LOG.info("flushing table"); 219 HTU.flush(tableName); 220 verifyReplication(tableName, regionReplication, 0, endRow, true); 221 } 222 } finally { 223 table.close(); 224 connection.close(); 225 } 226 } 227 228 @Test 229 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { 230 // Tests a table with region replication 3. Writes some data, and causes flushes and 231 // compactions. Verifies that the data is readable from the replicas. Note that this 232 // does not test whether the replicas actually pick up flushed files and apply compaction 233 // to their stores 234 int regionReplication = 3; 235 TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName()) 236 .setRegionReplication(regionReplication).build(); 237 createOrEnableTableWithRetries(htd, true); 238 final TableName tableName = htd.getTableName(); 239 240 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 241 Table table = connection.getTable(tableName); 242 try { 243 // load the data to the table 244 245 for (int i = 0; i < 6000; i += 1000) { 246 LOG.info("Writing data from " + i + " to " + (i + 1000)); 247 HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i + 1000); 248 LOG.info("flushing table"); 249 HTU.flush(tableName); 250 LOG.info("compacting table"); 251 HTU.compact(tableName, false); 252 } 253 254 verifyReplication(tableName, regionReplication, 0, 1000); 255 } finally { 256 table.close(); 257 connection.close(); 258 } 259 } 260 261 private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) { 262 // Helper function to run create/enable table operations with a retry feature 263 boolean continueToRetry = true; 264 int tries = 0; 265 while (continueToRetry && tries < 50) { 266 try { 267 continueToRetry = false; 268 if (createTableOperation) { 269 HTU.getAdmin().createTable(htd); 270 } else { 271 HTU.getAdmin().enableTable(htd.getTableName()); 272 } 273 } catch (IOException e) { 274 if (e.getCause() instanceof ReplicationException) { 275 continueToRetry = true; 276 tries++; 277 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); 278 } 279 } 280 } 281 } 282}