001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.ClusterConnection;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.util.RetryCounter;
036import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
037import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
038import org.apache.hadoop.hbase.util.RetryCounterFactory;
039import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
045
046/**
047 * Manage the bootstrap node list at region server side.
048 * <p/>
049 * It will request master first to get the initial set of bootstrap nodes(a sub set of live region
050 * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
051 * cases, if the cluster is stable, we do not need to request master again until we reach the
052 * request master interval. And if the current number of bootstrap nodes is not enough, we will
053 * request master soon.
054 * <p/>
055 * The algorithm is very simple, as we will always fallback to request master. THe trick here is
056 * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
057 * will not put too much pressure on master if we always request master. And for large clusters, we
058 * will soon get enough bootstrap nodes and stop requesting master.
059 */
060@InterfaceAudience.Private
061public class BootstrapNodeManager {
062
063  private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class);
064
065  public static final String REQUEST_MASTER_INTERVAL_SECS =
066    "hbase.server.bootstrap.request_master_interval.secs";
067
068  // default request every 10 minutes
069  public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10);
070
071  public static final String REQUEST_MASTER_MIN_INTERVAL_SECS =
072    "hbase.server.bootstrap.request_master_min_interval.secs";
073
074  // default 30 seconds
075  public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30;
076
077  public static final String REQUEST_REGIONSERVER_INTERVAL_SECS =
078    "hbase.server.bootstrap.request_regionserver_interval.secs";
079
080  // default request every 30 seconds
081  public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30;
082
083  private static final float JITTER = 0.2f;
084
085  private volatile List<ServerName> nodes = Collections.emptyList();
086
087  private final ClusterConnection conn;
088
089  private final MasterAddressTracker masterAddrTracker;
090
091  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
092    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
093
094  private final long requestMasterIntervalSecs;
095
096  private final long requestMasterMinIntervalSecs;
097
098  private final long requestRegionServerIntervalSecs;
099
100  private final int maxNodeCount;
101
102  private final RetryCounterFactory retryCounterFactory;
103
104  private RetryCounter retryCounter;
105
106  private long lastRequestMasterTime;
107
108  public BootstrapNodeManager(ClusterConnection conn, MasterAddressTracker masterAddrTracker) {
109    this.conn = conn;
110    this.masterAddrTracker = masterAddrTracker;
111    Configuration conf = conn.getConfiguration();
112    requestMasterIntervalSecs =
113      conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS);
114    requestMasterMinIntervalSecs =
115      conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS);
116    requestRegionServerIntervalSecs =
117      conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS);
118    maxNodeCount = conf.getInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT,
119      RSRpcServices.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
120    retryCounterFactory = new RetryCounterFactory(
121      new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER)
122        .setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs)
123        .setTimeUnit(TimeUnit.SECONDS));
124    executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
125      TimeUnit.SECONDS);
126  }
127
128  private long getDelay(long delay) {
129    long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER);
130    return delay + jitterDelay;
131  }
132
133  private void getFromMaster() {
134    List<ServerName> liveRegionServers;
135    try {
136      // get 2 times number of node
137      liveRegionServers =
138        conn.getLiveRegionServers(() -> masterAddrTracker.getMasterAddress(), maxNodeCount * 2);
139    } catch (IOException e) {
140      LOG.warn("failed to get live region servers from master", e);
141      if (retryCounter == null) {
142        retryCounter = retryCounterFactory.create();
143      }
144      executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(),
145        TimeUnit.SECONDS);
146      return;
147    }
148    retryCounter = null;
149    lastRequestMasterTime = EnvironmentEdgeManager.currentTime();
150    this.nodes = Collections.unmodifiableList(liveRegionServers);
151    if (liveRegionServers.size() < maxNodeCount) {
152      // If the number of live region servers is small, it means the cluster is small, so requesting
153      // master with a higher frequency will not be a big problem, so here we will always request
154      // master to get the live region servers as bootstrap nodes.
155      executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
156        TimeUnit.SECONDS);
157      return;
158    }
159    // schedule tasks to exchange the bootstrap nodes with other region servers.
160    executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
161      TimeUnit.SECONDS);
162  }
163
164  // this method is also used to test whether a given region server is still alive.
165  private void getFromRegionServer() {
166    if (
167      EnvironmentEdgeManager.currentTime() - lastRequestMasterTime
168          >= TimeUnit.SECONDS.toMillis(requestMasterIntervalSecs)
169    ) {
170      // schedule a get from master task immediately if haven't request master for more than
171      // requestMasterIntervalSecs
172      executor.execute(this::getFromMaster);
173      return;
174    }
175    List<ServerName> currentList = this.nodes;
176    ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size()));
177    List<ServerName> otherList;
178    try {
179      otherList = conn.getAllBootstrapNodes(peer);
180    } catch (IOException e) {
181      LOG.warn("failed to request region server {}", peer, e);
182      // remove this region server from the list since it can not respond successfully
183      List<ServerName> newList = currentList.stream().filter(sn -> sn != peer)
184        .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
185      this.nodes = newList;
186      if (newList.size() < maxNodeCount) {
187        // schedule a get from master task immediately
188        executor.execute(this::getFromMaster);
189      } else {
190        executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
191          TimeUnit.SECONDS);
192      }
193      return;
194    }
195    // randomly select new live region server list
196    Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList);
197    newRegionServers.addAll(otherList);
198    List<ServerName> newList = new ArrayList<ServerName>(newRegionServers);
199    Collections.shuffle(newList, ThreadLocalRandom.current());
200    int expectedListSize = maxNodeCount * 2;
201    if (newList.size() <= expectedListSize) {
202      this.nodes = Collections.unmodifiableList(newList);
203    } else {
204      this.nodes =
205        Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize)));
206    }
207    // schedule a new get from region server task
208    executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS);
209  }
210
211  public void stop() {
212    executor.shutdownNow();
213  }
214
215  public List<ServerName> getBootstrapNodes() {
216    return nodes;
217  }
218}