001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.ClusterConnection; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.util.RetryCounter; 036import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 037import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 038import org.apache.hadoop.hbase.util.RetryCounterFactory; 039import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 045 046/** 047 * Manage the bootstrap node list at region server side. 048 * <p/> 049 * It will request master first to get the initial set of bootstrap nodes(a sub set of live region 050 * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most 051 * cases, if the cluster is stable, we do not need to request master again until we reach the 052 * request master interval. And if the current number of bootstrap nodes is not enough, we will 053 * request master soon. 054 * <p/> 055 * The algorithm is very simple, as we will always fallback to request master. THe trick here is 056 * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it 057 * will not put too much pressure on master if we always request master. And for large clusters, we 058 * will soon get enough bootstrap nodes and stop requesting master. 059 */ 060@InterfaceAudience.Private 061public class BootstrapNodeManager { 062 063 private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class); 064 065 public static final String REQUEST_MASTER_INTERVAL_SECS = 066 "hbase.server.bootstrap.request_master_interval.secs"; 067 068 // default request every 10 minutes 069 public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10); 070 071 public static final String REQUEST_MASTER_MIN_INTERVAL_SECS = 072 "hbase.server.bootstrap.request_master_min_interval.secs"; 073 074 // default 30 seconds 075 public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30; 076 077 public static final String REQUEST_REGIONSERVER_INTERVAL_SECS = 078 "hbase.server.bootstrap.request_regionserver_interval.secs"; 079 080 // default request every 30 seconds 081 public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30; 082 083 private static final float JITTER = 0.2f; 084 085 private volatile List<ServerName> nodes = Collections.emptyList(); 086 087 private final ClusterConnection conn; 088 089 private final MasterAddressTracker masterAddrTracker; 090 091 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 092 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build()); 093 094 private final long requestMasterIntervalSecs; 095 096 private final long requestMasterMinIntervalSecs; 097 098 private final long requestRegionServerIntervalSecs; 099 100 private final int maxNodeCount; 101 102 private final RetryCounterFactory retryCounterFactory; 103 104 private RetryCounter retryCounter; 105 106 private long lastRequestMasterTime; 107 108 public BootstrapNodeManager(ClusterConnection conn, MasterAddressTracker masterAddrTracker) { 109 this.conn = conn; 110 this.masterAddrTracker = masterAddrTracker; 111 Configuration conf = conn.getConfiguration(); 112 requestMasterIntervalSecs = 113 conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS); 114 requestMasterMinIntervalSecs = 115 conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS); 116 requestRegionServerIntervalSecs = 117 conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS); 118 maxNodeCount = conf.getInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, 119 RSRpcServices.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 120 retryCounterFactory = new RetryCounterFactory( 121 new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER) 122 .setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs) 123 .setTimeUnit(TimeUnit.SECONDS)); 124 executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), 125 TimeUnit.SECONDS); 126 } 127 128 private long getDelay(long delay) { 129 long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER); 130 return delay + jitterDelay; 131 } 132 133 private void getFromMaster() { 134 List<ServerName> liveRegionServers; 135 try { 136 // get 2 times number of node 137 liveRegionServers = 138 conn.getLiveRegionServers(() -> masterAddrTracker.getMasterAddress(), maxNodeCount * 2); 139 } catch (IOException e) { 140 LOG.warn("failed to get live region servers from master", e); 141 if (retryCounter == null) { 142 retryCounter = retryCounterFactory.create(); 143 } 144 executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(), 145 TimeUnit.SECONDS); 146 return; 147 } 148 retryCounter = null; 149 lastRequestMasterTime = EnvironmentEdgeManager.currentTime(); 150 this.nodes = Collections.unmodifiableList(liveRegionServers); 151 if (liveRegionServers.size() < maxNodeCount) { 152 // If the number of live region servers is small, it means the cluster is small, so requesting 153 // master with a higher frequency will not be a big problem, so here we will always request 154 // master to get the live region servers as bootstrap nodes. 155 executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs), 156 TimeUnit.SECONDS); 157 return; 158 } 159 // schedule tasks to exchange the bootstrap nodes with other region servers. 160 executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), 161 TimeUnit.SECONDS); 162 } 163 164 // this method is also used to test whether a given region server is still alive. 165 private void getFromRegionServer() { 166 if ( 167 EnvironmentEdgeManager.currentTime() - lastRequestMasterTime 168 >= TimeUnit.SECONDS.toMillis(requestMasterIntervalSecs) 169 ) { 170 // schedule a get from master task immediately if haven't request master for more than 171 // requestMasterIntervalSecs 172 executor.execute(this::getFromMaster); 173 return; 174 } 175 List<ServerName> currentList = this.nodes; 176 ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size())); 177 List<ServerName> otherList; 178 try { 179 otherList = conn.getAllBootstrapNodes(peer); 180 } catch (IOException e) { 181 LOG.warn("failed to request region server {}", peer, e); 182 // remove this region server from the list since it can not respond successfully 183 List<ServerName> newList = currentList.stream().filter(sn -> sn != peer) 184 .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); 185 this.nodes = newList; 186 if (newList.size() < maxNodeCount) { 187 // schedule a get from master task immediately 188 executor.execute(this::getFromMaster); 189 } else { 190 executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs), 191 TimeUnit.SECONDS); 192 } 193 return; 194 } 195 // randomly select new live region server list 196 Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList); 197 newRegionServers.addAll(otherList); 198 List<ServerName> newList = new ArrayList<ServerName>(newRegionServers); 199 Collections.shuffle(newList, ThreadLocalRandom.current()); 200 int expectedListSize = maxNodeCount * 2; 201 if (newList.size() <= expectedListSize) { 202 this.nodes = Collections.unmodifiableList(newList); 203 } else { 204 this.nodes = 205 Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize))); 206 } 207 // schedule a new get from region server task 208 executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS); 209 } 210 211 public void stop() { 212 executor.shutdownNow(); 213 } 214 215 public List<ServerName> getBootstrapNodes() { 216 return nodes; 217 } 218}