001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.EnumSet; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.ClusterMetrics.Option; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.RegionMetrics; 038import org.apache.hadoop.hbase.ServerMetrics; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 056 057@RunWith(Parameterized.class) 058@Category({ ClientTests.class, LargeTests.class }) 059public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class); 064 065 @BeforeClass 066 public static void setUpBeforeClass() throws Exception { 067 068 setUpConfigurationFiles(TEST_UTIL); 069 TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 073 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 074 075 TEST_UTIL.startMiniCluster(2); 076 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 077 addResourceToRegionServerConfiguration(TEST_UTIL); 078 } 079 080 @Test 081 public void testGetMasterInfoPort() throws Exception { 082 assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), 083 (int) admin.getMasterInfoPort().get()); 084 } 085 086 @Test 087 public void testRegionServerOnlineConfigChange() throws Exception { 088 replaceHBaseSiteXML(); 089 admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); 090 091 // Check the configuration of the RegionServers 092 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 093 Configuration conf = thread.getRegionServer().getConfiguration(); 094 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 095 }); 096 097 restoreHBaseSiteXML(); 098 } 099 100 @Test 101 public void testMasterOnlineConfigChange() throws Exception { 102 replaceHBaseSiteXML(); 103 ServerName master = admin.getMaster().get(); 104 admin.updateConfiguration(master).join(); 105 admin.getBackupMasters().get() 106 .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); 107 108 // Check the configuration of the Masters 109 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 110 Configuration conf = thread.getMaster().getConfiguration(); 111 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 112 }); 113 114 restoreHBaseSiteXML(); 115 } 116 117 @Test 118 public void testAllClusterOnlineConfigChange() throws IOException { 119 replaceHBaseSiteXML(); 120 admin.updateConfiguration().join(); 121 122 // Check the configuration of the Masters 123 TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { 124 Configuration conf = thread.getMaster().getConfiguration(); 125 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 126 }); 127 128 // Check the configuration of the RegionServers 129 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 130 Configuration conf = thread.getRegionServer().getConfiguration(); 131 assertEquals(1000, conf.getInt("hbase.custom.config", 0)); 132 }); 133 134 restoreHBaseSiteXML(); 135 } 136 137 @Test 138 public void testRollWALWALWriter() throws Exception { 139 setUpforLogRolling(); 140 String className = this.getClass().getName(); 141 StringBuilder v = new StringBuilder(className); 142 while (v.length() < 1000) { 143 v.append(className); 144 } 145 byte[] value = Bytes.toBytes(v.toString()); 146 HRegionServer regionServer = startAndWriteData(tableName, value); 147 LOG.info("after writing there are " 148 + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); 149 150 // flush all regions 151 for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { 152 r.flush(true); 153 } 154 admin.rollWALWriter(regionServer.getServerName()).join(); 155 TEST_UTIL.waitFor(5000, () -> { 156 int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); 157 LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); 158 return count <= 2; 159 }); 160 } 161 162 private void setUpforLogRolling() { 163 // Force a region split after every 768KB 164 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); 165 166 // We roll the log after every 32 writes 167 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); 168 169 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); 170 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); 171 172 // For less frequently updated regions flush after every 2 flushes 173 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); 174 175 // We flush the cache after every 8192 bytes 176 TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); 177 178 // Increase the amount of time between client retries 179 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); 180 181 // Reduce thread wake frequency so that other threads can get 182 // a chance to run. 183 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); 184 185 /**** configuration for testLogRollOnDatanodeDeath ****/ 186 // lower the namenode & datanode heartbeat so the namenode 187 // quickly detects datanode failures 188 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 189 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 190 // the namenode might still try to choose the recently-dead datanode 191 // for a pipeline, so try to a new pipeline multiple times 192 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); 193 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); 194 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); 195 } 196 197 private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { 198 createTableWithDefaultConf(tableName); 199 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 200 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 201 for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls 202 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); 203 put.addColumn(FAMILY, null, value); 204 table.put(put).join(); 205 if (i % 32 == 0) { 206 // After every 32 writes sleep to let the log roller run 207 try { 208 Thread.sleep(2000); 209 } catch (InterruptedException e) { 210 // continue 211 } 212 } 213 } 214 return regionServer; 215 } 216 217 @Test 218 public void testGetRegionLoads() throws Exception { 219 // Turn off the balancer 220 admin.balancerSwitch(false).join(); 221 TableName[] tables = new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), 222 TableName.valueOf(tableName.getNameAsString() + "2"), 223 TableName.valueOf(tableName.getNameAsString() + "3") }; 224 createAndLoadTable(tables); 225 // Sleep to wait region server report 226 Thread 227 .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); 228 // Check if regions match with the regionLoad from the server 229 Collection<ServerName> servers = admin.getRegionServers().get(); 230 for (ServerName serverName : servers) { 231 List<RegionInfo> regions = admin.getRegions(serverName).get(); 232 checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); 233 } 234 235 // Check if regionLoad matches the table's regions and nothing is missed 236 for (TableName table : tables) { 237 List<RegionInfo> tableRegions = admin.getRegions(table).get(); 238 List<RegionMetrics> regionLoads = Lists.newArrayList(); 239 for (ServerName serverName : servers) { 240 regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); 241 } 242 checkRegionsAndRegionLoads(tableRegions, regionLoads); 243 } 244 245 // Check RegionLoad matches the regionLoad from ClusterStatus 246 ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); 247 assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size()); 248 for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics() 249 .entrySet()) { 250 ServerName sn = entry.getKey(); 251 ServerMetrics sm = entry.getValue(); 252 compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); 253 } 254 for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { 255 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); 256 257 } 258 } 259 260 @Test 261 public void testGetRegionServers() throws Exception { 262 List<ServerName> serverNames = new ArrayList<>(admin.getRegionServers(true).get()); 263 assertEquals(2, serverNames.size()); 264 265 List<ServerName> serversToDecom = new ArrayList<>(); 266 ServerName serverToDecommission = serverNames.get(0); 267 268 serversToDecom.add(serverToDecommission); 269 admin.decommissionRegionServers(serversToDecom, false).join(); 270 271 assertEquals(1, admin.getRegionServers(true).get().size()); 272 assertEquals(2, admin.getRegionServers(false).get().size()); 273 274 admin.recommissionRegionServer(serverToDecommission, Collections.emptyList()).join(); 275 276 assertEquals(2, admin.getRegionServers(true).get().size()); 277 assertEquals(2, admin.getRegionServers(false).get().size()); 278 } 279 280 private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, 281 Collection<RegionMetrics> regionLoads) { 282 283 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", 284 regionLoadCluster.size(), regionLoads.size()); 285 286 for (RegionMetrics loadCluster : regionLoadCluster) { 287 boolean matched = false; 288 for (RegionMetrics load : regionLoads) { 289 if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { 290 matched = true; 291 continue; 292 } 293 } 294 assertTrue("The contents of region load from cluster and server should match", matched); 295 } 296 } 297 298 private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, 299 Collection<RegionMetrics> regionLoads) { 300 301 assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); 302 303 Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); 304 for (RegionMetrics regionLoad : regionLoads) { 305 regionLoadMap.put(regionLoad.getRegionName(), regionLoad); 306 } 307 for (RegionInfo info : regions) { 308 assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() 309 + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); 310 } 311 } 312 313 private void createAndLoadTable(TableName[] tables) { 314 for (TableName table : tables) { 315 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); 316 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 317 admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); 318 AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); 319 List<Put> puts = new ArrayList<>(); 320 for (byte[] row : HBaseTestingUtil.ROWS) { 321 puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); 322 } 323 asyncTable.putAll(puts).join(); 324 } 325 } 326}