001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022 023import java.io.File; 024import java.io.IOException; 025import org.apache.commons.io.FileUtils; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 031import org.apache.hadoop.hbase.StartTestingClusterOption; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNameTestRule; 034import org.apache.hadoop.hbase.master.HMaster; 035import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 042import org.junit.AfterClass; 043import org.junit.Before; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Category({ ClientTests.class, MediumTests.class }) 053public class TestSeparateClientZKCluster { 054 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 private static final File clientZkDir = 057 new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString()); 058 private static final int ZK_SESSION_TIMEOUT = 5000; 059 private static MiniZooKeeperCluster clientZkCluster; 060 061 private final byte[] family = Bytes.toBytes("cf"); 062 private final byte[] qualifier = Bytes.toBytes("c1"); 063 private final byte[] row = Bytes.toBytes("row"); 064 private final byte[] value = Bytes.toBytes("v1"); 065 private final byte[] newVal = Bytes.toBytes("v2"); 066 067 @Rule 068 public TableNameTestRule name = new TableNameTestRule(); 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); 073 074 @BeforeClass 075 public static void beforeAllTests() throws Exception { 076 int clientZkPort = 21828; 077 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 078 clientZkCluster.setDefaultClientPort(clientZkPort); 079 clientZkCluster.startup(clientZkDir); 080 // start log counter 081 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", 3); 082 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 083 // core settings for testing client ZK cluster 084 TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 085 ZKConnectionRegistry.class, ConnectionRegistry.class); 086 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 087 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 088 // reduce zk session timeout to easier trigger session expiration 089 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 090 // Start a cluster with 2 masters and 3 regionservers. 091 StartTestingClusterOption option = 092 StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 093 TEST_UTIL.startMiniCluster(option); 094 } 095 096 @AfterClass 097 public static void afterAllTests() throws Exception { 098 TEST_UTIL.shutdownMiniCluster(); 099 clientZkCluster.shutdown(); 100 FileUtils.deleteDirectory(clientZkDir); 101 } 102 103 @Before 104 public void setUp() throws IOException { 105 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 106 waitForNewMasterUpAndAddressSynced(admin); 107 } 108 } 109 110 private void waitForNewMasterUpAndAddressSynced(Admin admin) { 111 TEST_UTIL.waitFor(30000, () -> { 112 try { 113 return admin.listNamespaces().length > 0; 114 } catch (Exception e) { 115 LOG.warn("failed to list namespaces", e); 116 return false; 117 } 118 }); 119 } 120 121 @Test 122 public void testBasicOperation() throws Exception { 123 TableName tn = name.getTableName(); 124 // create table 125 Connection conn = TEST_UTIL.getConnection(); 126 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 127 ColumnFamilyDescriptorBuilder cfDescBuilder = 128 ColumnFamilyDescriptorBuilder.newBuilder(family); 129 TableDescriptorBuilder tableDescBuilder = 130 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 131 admin.createTable(tableDescBuilder.build()); 132 // test simple get and put 133 Put put = new Put(row); 134 put.addColumn(family, qualifier, value); 135 table.put(put); 136 Get get = new Get(row); 137 Result result = table.get(get); 138 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 139 assertArrayEquals(value, result.getValue(family, qualifier)); 140 } 141 } 142 143 @Test 144 public void testMasterSwitch() throws Exception { 145 // get an admin instance and issue some request first 146 Connection conn = TEST_UTIL.getConnection(); 147 try (Admin admin = conn.getAdmin()) { 148 LOG.debug("Tables: " + admin.listTableDescriptors()); 149 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 150 // switch active master 151 HMaster master = cluster.getMaster(); 152 master.stopMaster(); 153 LOG.info("Stopped master {}", master.getServerName()); 154 TEST_UTIL.waitFor(30000, () -> !master.isAlive()); 155 LOG.info("Shutdown master {}", master.getServerName()); 156 TEST_UTIL.waitFor(30000, 157 () -> cluster.getMaster() != null && cluster.getMaster().isInitialized()); 158 LOG.info("Got master {}", cluster.getMaster().getServerName()); 159 // confirm client access still works 160 waitForNewMasterUpAndAddressSynced(admin); 161 } 162 } 163 164 @Test 165 public void testMetaRegionMove() throws Exception { 166 TableName tn = name.getTableName(); 167 // create table 168 Connection conn = TEST_UTIL.getConnection(); 169 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn); 170 RegionLocator locator = conn.getRegionLocator(tn)) { 171 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 172 ColumnFamilyDescriptorBuilder cfDescBuilder = 173 ColumnFamilyDescriptorBuilder.newBuilder(family); 174 TableDescriptorBuilder tableDescBuilder = 175 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 176 admin.createTable(tableDescBuilder.build()); 177 // issue some requests to cache the region location 178 Put put = new Put(row); 179 put.addColumn(family, qualifier, value); 180 table.put(put); 181 Get get = new Get(row); 182 Result result = table.get(get); 183 // move meta region and confirm client could detect 184 ServerName destServerName = null; 185 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 186 ServerName name = rst.getRegionServer().getServerName(); 187 if (!name.equals(cluster.getServerHoldingMeta())) { 188 destServerName = name; 189 break; 190 } 191 } 192 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 193 LOG.debug("Finished moving meta"); 194 // invalidate client cache 195 RegionInfo region = locator.getRegionLocation(row).getRegion(); 196 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 197 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 198 ServerName name = rst.getRegionServer().getServerName(); 199 if (!name.equals(currentServer)) { 200 destServerName = name; 201 break; 202 } 203 } 204 admin.move(region.getEncodedNameAsBytes(), destServerName); 205 LOG.debug("Finished moving user region"); 206 put = new Put(row); 207 put.addColumn(family, qualifier, newVal); 208 table.put(put); 209 result = table.get(get); 210 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 211 assertArrayEquals(newVal, result.getValue(family, qualifier)); 212 } 213 } 214 215 @Test 216 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 217 TableName tn = name.getTableName(); 218 // create table 219 Connection conn = TEST_UTIL.getConnection(); 220 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 221 ColumnFamilyDescriptorBuilder cfDescBuilder = 222 ColumnFamilyDescriptorBuilder.newBuilder(family); 223 TableDescriptorBuilder tableDescBuilder = 224 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 225 admin.createTable(tableDescBuilder.build()); 226 // put some data 227 Put put = new Put(row); 228 put.addColumn(family, qualifier, value); 229 table.put(put); 230 // invalid connection cache 231 conn.clearRegionLocationCache(); 232 // stop client zk cluster 233 clientZkCluster.shutdown(); 234 // stop current meta server and confirm the server shutdown process 235 // is not affected by client ZK crash 236 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 237 int metaServerId = cluster.getServerWithMeta(); 238 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 239 metaServer.stop("Stop current RS holding meta region"); 240 while (metaServer.isAlive()) { 241 Thread.sleep(200); 242 } 243 // wait for meta region online 244 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 245 RegionInfoBuilder.FIRST_META_REGIONINFO); 246 // wait some long time to make sure we will retry sync data to client ZK until data set 247 Thread.sleep(10000); 248 clientZkCluster.startup(clientZkDir); 249 // new request should pass 250 Get get = new Get(row); 251 Result result = table.get(get); 252 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 253 assertArrayEquals(value, result.getValue(family, qualifier)); 254 } 255 } 256 257 @Test 258 public void testAsyncTable() throws Exception { 259 TableName tn = name.getTableName(); 260 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 261 TableDescriptorBuilder tableDescBuilder = 262 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 263 try (AsyncConnection ASYNC_CONN = 264 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 265 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 266 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 267 // put some data 268 Put put = new Put(row); 269 put.addColumn(family, qualifier, value); 270 table.put(put).get(); 271 // get and verify 272 Get get = new Get(row); 273 Result result = table.get(get).get(); 274 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 275 assertArrayEquals(value, result.getValue(family, qualifier)); 276 } 277 } 278 279 @Test 280 public void testChangeMetaReplicaCount() throws Exception { 281 Admin admin = TEST_UTIL.getAdmin(); 282 try (RegionLocator locator = 283 TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { 284 assertEquals(1, locator.getAllRegionLocations().size()); 285 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3); 286 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3); 287 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 2); 288 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2); 289 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 1); 290 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1); 291 } 292 } 293}