001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseRpcServicesBase; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.client.AsyncClusterConnection; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.FutureUtils; 037import org.apache.hadoop.hbase.util.RetryCounter; 038import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 039import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 040import org.apache.hadoop.hbase.util.RetryCounterFactory; 041import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 047 048/** 049 * Manage the bootstrap node list at region server side. 050 * <p/> 051 * It will request master first to get the initial set of bootstrap nodes(a sub set of live region 052 * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most 053 * cases, if the cluster is stable, we do not need to request master again until we reach the 054 * request master interval. And if the current number of bootstrap nodes is not enough, we will 055 * request master soon. 056 * <p/> 057 * The algorithm is very simple, as we will always fallback to request master. THe trick here is 058 * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it 059 * will not put too much pressure on master if we always request master. And for large clusters, we 060 * will soon get enough bootstrap nodes and stop requesting master. 061 */ 062@InterfaceAudience.Private 063public class BootstrapNodeManager { 064 065 private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class); 066 067 public static final String REQUEST_MASTER_INTERVAL_SECS = 068 "hbase.server.bootstrap.request_master_interval.secs"; 069 070 // default request every 10 minutes 071 public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10); 072 073 public static final String REQUEST_MASTER_MIN_INTERVAL_SECS = 074 "hbase.server.bootstrap.request_master_min_interval.secs"; 075 076 // default 30 seconds 077 public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30; 078 079 public static final String REQUEST_REGIONSERVER_INTERVAL_SECS = 080 "hbase.server.bootstrap.request_regionserver_interval.secs"; 081 082 // default request every 30 seconds 083 public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30; 084 085 private static final float JITTER = 0.2f; 086 087 private volatile List<ServerName> nodes = Collections.emptyList(); 088 089 private final AsyncClusterConnection conn; 090 091 private final MasterAddressTracker masterAddrTracker; 092 093 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 094 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build()); 095 096 private final long requestMasterIntervalSecs; 097 098 private final long requestMasterMinIntervalSecs; 099 100 private final long requestRegionServerIntervalSecs; 101 102 private final int maxNodeCount; 103 104 private final RetryCounterFactory retryCounterFactory; 105 106 private RetryCounter retryCounter; 107 108 private long lastRequestMasterTime; 109 110 public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker) { 111 this.conn = conn; 112 this.masterAddrTracker = masterAddrTracker; 113 Configuration conf = conn.getConfiguration(); 114 requestMasterIntervalSecs = 115 conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS); 116 requestMasterMinIntervalSecs = 117 conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS); 118 requestRegionServerIntervalSecs = 119 conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS); 120 maxNodeCount = conf.getInt(HBaseRpcServicesBase.CLIENT_BOOTSTRAP_NODE_LIMIT, 121 HBaseRpcServicesBase.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 122 retryCounterFactory = new RetryCounterFactory( 123 new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER) 124 .setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs) 125 .setTimeUnit(TimeUnit.SECONDS)); 126 executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), 127 TimeUnit.SECONDS); 128 } 129 130 private long getDelay(long delay) { 131 long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER); 132 return delay + jitterDelay; 133 } 134 135 private void getFromMaster() { 136 List<ServerName> liveRegionServers; 137 try { 138 // get 2 times number of node 139 liveRegionServers = 140 FutureUtils.get(conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2)); 141 } catch (IOException e) { 142 LOG.warn("failed to get live region servers from master", e); 143 if (retryCounter == null) { 144 retryCounter = retryCounterFactory.create(); 145 } 146 executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(), 147 TimeUnit.SECONDS); 148 return; 149 } 150 retryCounter = null; 151 lastRequestMasterTime = EnvironmentEdgeManager.currentTime(); 152 this.nodes = Collections.unmodifiableList(liveRegionServers); 153 if (liveRegionServers.size() < maxNodeCount) { 154 // If the number of live region servers is small, it means the cluster is small, so requesting 155 // master with a higher frequency will not be a big problem, so here we will always request 156 // master to get the live region servers as bootstrap nodes. 157 executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), 158 TimeUnit.SECONDS); 159 return; 160 } 161 // schedule tasks to exchange the bootstrap nodes with other region servers. 162 executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), 163 TimeUnit.SECONDS); 164 } 165 166 // this method is also used to test whether a given region server is still alive. 167 private void getFromRegionServer() { 168 if ( 169 EnvironmentEdgeManager.currentTime() - lastRequestMasterTime 170 >= TimeUnit.SECONDS.toMillis(requestMasterIntervalSecs) 171 ) { 172 // schedule a get from master task immediately if haven't request master for more than 173 // requestMasterIntervalSecs 174 executor.execute(this::getFromMaster); 175 return; 176 } 177 List<ServerName> currentList = this.nodes; 178 ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size())); 179 List<ServerName> otherList; 180 try { 181 otherList = FutureUtils.get(conn.getAllBootstrapNodes(peer)); 182 } catch (IOException e) { 183 LOG.warn("failed to request region server {}", peer, e); 184 // remove this region server from the list since it can not respond successfully 185 List<ServerName> newList = currentList.stream().filter(sn -> sn != peer) 186 .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); 187 this.nodes = newList; 188 if (newList.size() < maxNodeCount) { 189 // schedule a get from master task immediately 190 executor.execute(this::getFromMaster); 191 } else { 192 executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), 193 TimeUnit.SECONDS); 194 } 195 return; 196 } 197 // randomly select new live region server list 198 Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList); 199 newRegionServers.addAll(otherList); 200 List<ServerName> newList = new ArrayList<ServerName>(newRegionServers); 201 Collections.shuffle(newList, ThreadLocalRandom.current()); 202 int expectedListSize = maxNodeCount * 2; 203 if (newList.size() <= expectedListSize) { 204 this.nodes = Collections.unmodifiableList(newList); 205 } else { 206 this.nodes = 207 Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize))); 208 } 209 // schedule a new get from region server task 210 executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS); 211 } 212 213 public void stop() { 214 executor.shutdownNow(); 215 } 216 217 public List<ServerName> getBootstrapNodes() { 218 return nodes; 219 } 220}