001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.InetAddress;
030import java.nio.file.Files;
031import java.nio.file.Paths;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Locale;
040import java.util.Optional;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.CancellationException;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.TimeoutException;
050import java.util.function.Predicate;
051import org.apache.commons.io.IOUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.MetaTableAccessor;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.UnknownRegionException;
060import org.apache.hadoop.hbase.client.Admin;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.ConnectionFactory;
063import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.RegionInfoBuilder;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.master.RackManager;
068import org.apache.hadoop.hbase.master.RegionState;
069import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
070import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
071import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
072import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
078import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
079import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
080
081/**
082 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
083 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
084 * acknowledges if regions are online after movement while noAck mode is best effort mode that
085 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
086 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
087 * anyways. This can also be used by constructiong an Object using the builder and then calling
088 * {@link #load()} or {@link #unload()} methods for the desired operations.
089 */
090@InterfaceAudience.Public
091public class RegionMover extends AbstractHBaseTool implements Closeable {
092  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
093  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
094  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
095  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
096  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
097  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
098
099  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
100
101  private RegionMoverBuilder rmbuilder;
102  private boolean ack = true;
103  private int maxthreads = 1;
104  private int timeout;
105  private List<String> isolateRegionIdArray;
106  private String loadUnload;
107  private String hostname;
108  private String filename;
109  private String excludeFile;
110  private String designatedFile;
111  private int port;
112  private Connection conn;
113  private Admin admin;
114  private RackManager rackManager;
115
116  private RegionMover(RegionMoverBuilder builder) throws IOException {
117    this.hostname = builder.hostname;
118    this.filename = builder.filename;
119    this.excludeFile = builder.excludeFile;
120    this.designatedFile = builder.designatedFile;
121    this.maxthreads = builder.maxthreads;
122    this.isolateRegionIdArray = builder.isolateRegionIdArray;
123    this.ack = builder.ack;
124    this.port = builder.port;
125    this.timeout = builder.timeout;
126    setConf(builder.conf);
127    this.conn = ConnectionFactory.createConnection(conf);
128    this.admin = conn.getAdmin();
129
130    // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need
131    // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed
132    // normally, see HBASE-27304 for details.
133    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
134    if (InetAddresses.isInetAddress(master.getHostname())) {
135      if (!InetAddresses.isInetAddress(this.hostname)) {
136        this.hostname = InetAddress.getByName(this.hostname).getHostAddress();
137      }
138    }
139
140    // Only while running unit tests, builder.rackManager will not be null for the convenience of
141    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
142    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
143    // provided as @InterfaceAudience.Private and it is commented that this is just
144    // to be used by unit test.
145    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
146  }
147
148  private RegionMover() {
149  }
150
151  @Override
152  public void close() {
153    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
154    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
155  }
156
157  /**
158   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
159   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
160   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
161   * the corresponding options.
162   */
163  public static class RegionMoverBuilder {
164    private boolean ack = true;
165    private int maxthreads = 1;
166    private int timeout = Integer.MAX_VALUE;
167    private List<String> isolateRegionIdArray = new ArrayList<>();
168    private String hostname;
169    private String filename;
170    private String excludeFile = null;
171    private String designatedFile = null;
172    private String defaultDir = System.getProperty("java.io.tmpdir");
173    @InterfaceAudience.Private
174    final int port;
175    private final Configuration conf;
176    private RackManager rackManager;
177
178    public RegionMoverBuilder(String hostname) {
179      this(hostname, createConf());
180    }
181
182    /**
183     * Creates a new configuration and sets region mover specific overrides
184     */
185    private static Configuration createConf() {
186      Configuration conf = HBaseConfiguration.create();
187      conf.setInt("hbase.client.prefetch.limit", 1);
188      conf.setInt("hbase.client.pause", 500);
189      conf.setInt("hbase.client.retries.number", 100);
190      return conf;
191    }
192
193    /**
194     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
195     *                 hostname:port.
196     * @param conf     Configuration object
197     */
198    public RegionMoverBuilder(String hostname, Configuration conf) {
199      String[] splitHostname = hostname.toLowerCase().split(":");
200      this.hostname = splitHostname[0];
201      if (splitHostname.length == 2) {
202        this.port = Integer.parseInt(splitHostname[1]);
203      } else {
204        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
205      }
206      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
207        + ":" + Integer.toString(this.port);
208      this.conf = conf;
209    }
210
211    /**
212     * Path of file where regions will be written to during unloading/read from during loading
213     * @return RegionMoverBuilder object
214     */
215    public RegionMoverBuilder filename(String filename) {
216      this.filename = filename;
217      return this;
218    }
219
220    /**
221     * Set the max number of threads that will be used to move regions
222     */
223    public RegionMoverBuilder maxthreads(int threads) {
224      this.maxthreads = threads;
225      return this;
226    }
227
228    /**
229     * Set the region ID to isolate on the region server.
230     */
231    public RegionMoverBuilder isolateRegionIdArray(List<String> isolateRegionIdArray) {
232      this.isolateRegionIdArray = isolateRegionIdArray;
233      return this;
234    }
235
236    /**
237     * Path of file containing hostnames to be excluded during region movement. Exclude file should
238     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
239     * host.
240     */
241    public RegionMoverBuilder excludeFile(String excludefile) {
242      this.excludeFile = excludefile;
243      return this;
244    }
245
246    /**
247     * Set the designated file. Designated file contains hostnames where region moves. Designated
248     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
249     * on a single host.
250     * @param designatedFile The designated file
251     * @return RegionMoverBuilder object
252     */
253    public RegionMoverBuilder designatedFile(String designatedFile) {
254      this.designatedFile = designatedFile;
255      return this;
256    }
257
258    /**
259     * Set ack/noAck mode.
260     * <p>
261     * In ack mode regions are acknowledged before and after moving and the move is retried
262     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
263     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
264     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
265     * <p>
266     * @return RegionMoverBuilder object
267     */
268    public RegionMoverBuilder ack(boolean ack) {
269      this.ack = ack;
270      return this;
271    }
272
273    /**
274     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
275     * movers also have a separate time which is hbase.move.wait.max * number of regions to
276     * load/unload
277     * @param timeout in seconds
278     * @return RegionMoverBuilder object
279     */
280    public RegionMoverBuilder timeout(int timeout) {
281      this.timeout = timeout;
282      return this;
283    }
284
285    /**
286     * Set specific rackManager implementation. This setter method is for testing purpose only.
287     * @param rackManager rackManager impl
288     * @return RegionMoverBuilder object
289     */
290    @InterfaceAudience.Private
291    public RegionMoverBuilder rackManager(RackManager rackManager) {
292      this.rackManager = rackManager;
293      return this;
294    }
295
296    /**
297     * This method builds the appropriate RegionMover object which can then be used to load/unload
298     * using load and unload methods
299     * @return RegionMover object
300     */
301    public RegionMover build() throws IOException {
302      return new RegionMover(this);
303    }
304  }
305
306  /**
307   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
308   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
309   * @return true if loading succeeded, false otherwise
310   */
311  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
312    ExecutorService loadPool = Executors.newFixedThreadPool(1);
313    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
314    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
315    if (!isMetaMoved) {
316      return false;
317    }
318    loadPool = Executors.newFixedThreadPool(1);
319    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
320    return waitTaskToFinish(loadPool, loadTask, "loading");
321  }
322
323  private Callable<Boolean> getMetaRegionMovePlan() {
324    return getRegionsMovePlan(true);
325  }
326
327  private Callable<Boolean> getNonMetaRegionsMovePlan() {
328    return getRegionsMovePlan(false);
329  }
330
331  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
332    return () -> {
333      try {
334        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
335        if (regionsToMove.isEmpty()) {
336          LOG.info("No regions to load.Exiting");
337          return true;
338        }
339        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
340        if (moveMetaRegion) {
341          if (metaRegion.isPresent()) {
342            loadRegions(Collections.singletonList(metaRegion.get()));
343          }
344        } else {
345          metaRegion.ifPresent(regionsToMove::remove);
346          loadRegions(regionsToMove);
347        }
348      } catch (Exception e) {
349        LOG.error("Error while loading regions to " + hostname, e);
350        return false;
351      }
352      return true;
353    };
354  }
355
356  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
357    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
358  }
359
360  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
361    ServerName server = getTargetServer();
362    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
363    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
364      + this.maxthreads + " threads.Ack mode:" + this.ack);
365
366    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
367    List<Future<Boolean>> taskList = new ArrayList<>();
368    int counter = 0;
369    while (counter < regionsToMove.size()) {
370      RegionInfo region = regionsToMove.get(counter);
371      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
372      if (currentServer == null) {
373        LOG
374          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
375        counter++;
376        continue;
377      } else if (server.equals(currentServer)) {
378        LOG.info(
379          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
380        counter++;
381        continue;
382      }
383      if (ack) {
384        Future<Boolean> task = moveRegionsPool
385          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
386        taskList.add(task);
387      } else {
388        Future<Boolean> task = moveRegionsPool
389          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
390        taskList.add(task);
391      }
392      counter++;
393    }
394
395    moveRegionsPool.shutdown();
396    long timeoutInSeconds = regionsToMove.size()
397      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
398    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
399  }
400
401  /**
402   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
403   * noAck mode we do not make sure that region is successfully online on the target region
404   * server,hence it is best effort.We do not unload regions to hostnames given in
405   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
406   * to hostnames provided in {@link #designatedFile}
407   * @return true if unloading succeeded, false otherwise
408   */
409  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
410    return unloadRegions(false);
411  }
412
413  /**
414   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
415   * noAck mode we do not make sure that region is successfully online on the target region
416   * server,hence it is best effort.We do not unload regions to hostnames given in
417   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
418   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
419   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
420   * that belong to same rack as source RegionServer.
421   * @return true if unloading succeeded, false otherwise
422   */
423  public boolean unloadFromRack()
424    throws InterruptedException, ExecutionException, TimeoutException {
425    return unloadRegions(true);
426  }
427
428  private boolean unloadRegions(boolean unloadFromRack)
429    throws ExecutionException, InterruptedException, TimeoutException {
430    return unloadRegions(unloadFromRack, null);
431  }
432
433  /**
434   * Isolated regions specified in {@link #isolateRegionIdArray} on {@link #hostname} in ack Mode
435   * and Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.
436   * In noAck mode we do not make sure that region is successfully online on the target region
437   * server,hence it is the best effort. We do not unload regions to hostnames given in
438   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
439   * to hostnames provided in {@link #designatedFile}
440   * @return true if region isolation succeeded, false otherwise
441   */
442  public boolean isolateRegions()
443    throws ExecutionException, InterruptedException, TimeoutException {
444    return unloadRegions(false, isolateRegionIdArray);
445  }
446
447  private boolean unloadRegions(boolean unloadFromRack, List<String> isolateRegionIdArray)
448    throws InterruptedException, ExecutionException, TimeoutException {
449    deleteFile(this.filename);
450    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
451    Future<Boolean> unloadTask = unloadPool.submit(() -> {
452      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
453      try {
454        // Get Online RegionServers
455        List<ServerName> regionServers = new ArrayList<>();
456        regionServers.addAll(admin.getRegionServers());
457        // Remove the host Region server from target Region Servers list
458        ServerName server = stripServer(regionServers, hostname, port);
459        if (server == null) {
460          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
461            hostname, port);
462          LOG.debug("List of region servers: {}", regionServers);
463          return false;
464        }
465        // Remove RS not present in the designated file
466        includeExcludeRegionServers(designatedFile, regionServers, true);
467
468        // Remove RS present in the exclude file
469        includeExcludeRegionServers(excludeFile, regionServers, false);
470
471        if (unloadFromRack) {
472          // remove regionServers that belong to same rack (as source host) since the goal is to
473          // unload regions from source regionServer to destination regionServers
474          // that belong to different rack only.
475          String sourceRack = rackManager.getRack(server);
476          List<String> racks = rackManager.getRack(regionServers);
477          Iterator<ServerName> iterator = regionServers.iterator();
478          int i = 0;
479          while (iterator.hasNext()) {
480            iterator.next();
481            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
482              iterator.remove();
483            }
484            i++;
485          }
486        }
487
488        // Remove decommissioned RS
489        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
490        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
491          regionServers.removeIf(decommissionedRS::contains);
492          LOG.debug("Excluded RegionServers from unloading regions to because they "
493            + "are marked as decommissioned. Servers: {}", decommissionedRS);
494        }
495
496        stripMaster(regionServers);
497        if (regionServers.isEmpty()) {
498          LOG.warn("No Regions were moved - no servers available");
499          return false;
500        }
501        unloadRegions(server, regionServers, movedRegions, isolateRegionIdArray);
502      } catch (Exception e) {
503        LOG.error("Error while unloading regions ", e);
504        return false;
505      } finally {
506        if (movedRegions != null) {
507          writeFile(filename, movedRegions);
508        }
509      }
510      return true;
511    });
512    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
513  }
514
515  private void unloadRegions(ServerName server, List<ServerName> regionServers,
516    List<RegionInfo> movedRegions, List<String> isolateRegionIdArray) throws Exception {
517    while (true) {
518      List<RegionInfo> isolateRegionInfoList = Collections.synchronizedList(new ArrayList<>());
519      RegionInfo isolateRegionInfo = null;
520      if (isolateRegionIdArray != null && !isolateRegionIdArray.isEmpty()) {
521        // Region will be moved to target region server with Ack mode.
522        final ExecutorService isolateRegionPool = Executors.newFixedThreadPool(maxthreads);
523        List<Future<Boolean>> isolateRegionTaskList = new ArrayList<>();
524        List<RegionInfo> recentlyIsolatedRegion = Collections.synchronizedList(new ArrayList<>());
525        boolean allRegionOpsSuccessful = true;
526        boolean isMetaIsolated = false;
527        RegionInfo metaRegionInfo = RegionInfoBuilder.FIRST_META_REGIONINFO;
528        List<HRegionLocation> hRegionLocationRegionIsolation =
529          Collections.synchronizedList(new ArrayList<>());
530        for (String isolateRegionId : isolateRegionIdArray) {
531          if (isolateRegionId.equalsIgnoreCase(metaRegionInfo.getEncodedName())) {
532            isMetaIsolated = true;
533            continue;
534          }
535          Result result = MetaTableAccessor.scanByRegionEncodedName(conn, isolateRegionId);
536          HRegionLocation hRegionLocation =
537            MetaTableAccessor.getRegionLocation(conn, result.getRow());
538          if (hRegionLocation != null) {
539            hRegionLocationRegionIsolation.add(hRegionLocation);
540          } else {
541            LOG.error("Region " + isolateRegionId + " doesn't exists/can't fetch from"
542              + " meta...Quitting now");
543            // We only move the regions if all the regions were found.
544            allRegionOpsSuccessful = false;
545            break;
546          }
547        }
548
549        if (!allRegionOpsSuccessful) {
550          break;
551        }
552        // If hbase:meta region was isolated, then it needs to be part of isolateRegionInfoList.
553        if (isMetaIsolated) {
554          ZKWatcher zkWatcher = new ZKWatcher(conf, null, null);
555          List<HRegionLocation> result = new ArrayList<>();
556          for (String znode : zkWatcher.getMetaReplicaNodes()) {
557            String path = ZNodePaths.joinZNode(zkWatcher.getZNodePaths().baseZNode, znode);
558            int replicaId = zkWatcher.getZNodePaths().getMetaReplicaIdFromPath(path);
559            RegionState state = MetaTableLocator.getMetaRegionState(zkWatcher, replicaId);
560            result.add(new HRegionLocation(state.getRegion(), state.getServerName()));
561          }
562          ServerName metaSeverName = result.get(0).getServerName();
563          // For isolating hbase:meta, it should move explicitly in Ack mode,
564          // hence the forceMoveRegionByAck = true.
565          if (!metaSeverName.equals(server)) {
566            LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " is on server "
567              + metaSeverName + " moving to " + server);
568            submitRegionMovesWhileUnloading(metaSeverName, Collections.singletonList(server),
569              movedRegions, Collections.singletonList(metaRegionInfo), true);
570          } else {
571            LOG.info("Region of hbase:meta " + metaRegionInfo.getEncodedName() + " already exists"
572              + " on server : " + server);
573          }
574          isolateRegionInfoList.add(RegionInfoBuilder.FIRST_META_REGIONINFO);
575        }
576
577        if (!hRegionLocationRegionIsolation.isEmpty()) {
578          for (HRegionLocation hRegionLocation : hRegionLocationRegionIsolation) {
579            isolateRegionInfo = hRegionLocation.getRegion();
580            isolateRegionInfoList.add(isolateRegionInfo);
581            if (hRegionLocation.getServerName() == server) {
582              LOG.info("Region " + hRegionLocation.getRegion().getEncodedName() + " already exists"
583                + " on server : " + server.getHostname());
584            } else {
585              Future<Boolean> isolateRegionTask =
586                isolateRegionPool.submit(new MoveWithAck(conn, isolateRegionInfo,
587                  hRegionLocation.getServerName(), server, recentlyIsolatedRegion));
588              isolateRegionTaskList.add(isolateRegionTask);
589            }
590          }
591        }
592
593        if (!isolateRegionTaskList.isEmpty()) {
594          isolateRegionPool.shutdown();
595          // Now that we have fetched all the region's regionInfo, we can move them.
596          waitMoveTasksToFinish(isolateRegionPool, isolateRegionTaskList,
597            admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX));
598
599          Set<RegionInfo> currentRegionsOnTheServer = new HashSet<>(admin.getRegions(server));
600          if (!currentRegionsOnTheServer.containsAll(isolateRegionInfoList)) {
601            // If all the regions are not online on the target server,
602            // we don't put RS in decommission mode and exit from here.
603            LOG.error("One of the Region move failed OR stuck in transition...Quitting now");
604            break;
605          }
606        } else {
607          LOG.info("All regions already exists on server : " + server.getHostname());
608        }
609        // Once region has been moved to target RS, put the target RS into decommission mode,
610        // so master doesn't assign new region to the target RS while we unload the target RS.
611        // Also pass 'offload' flag as false since we don't want master to offload the target RS.
612        List<ServerName> listOfServer = new ArrayList<>();
613        listOfServer.add(server);
614        LOG.info("Putting server : " + server.getHostname() + " in decommission/draining mode");
615        admin.decommissionRegionServers(listOfServer, false);
616      }
617      List<RegionInfo> regionsToMove = admin.getRegions(server);
618      // Remove all the regions from the online Region list, that we just isolated.
619      // This will also include hbase:meta if it was isolated.
620      regionsToMove.removeAll(isolateRegionInfoList);
621      regionsToMove.removeAll(movedRegions);
622      if (regionsToMove.isEmpty()) {
623        LOG.info("No Regions to move....Quitting now");
624        break;
625      }
626      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
627        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
628
629      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
630      if (metaRegion.isPresent()) {
631        RegionInfo meta = metaRegion.get();
632        // hbase:meta should move explicitly in Ack mode.
633        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
634          Collections.singletonList(meta), true);
635        regionsToMove.remove(meta);
636      }
637      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove, false);
638    }
639  }
640
641  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
642    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove, boolean forceMoveRegionByAck)
643    throws Exception {
644    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
645    List<Future<Boolean>> taskList = new ArrayList<>();
646    int serverIndex = 0;
647    for (RegionInfo regionToMove : regionsToMove) {
648      // To move/isolate hbase:meta on a server, it should happen explicitly by Ack mode, hence the
649      // forceMoveRegionByAck = true.
650      if (ack || forceMoveRegionByAck) {
651        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
652          regionServers.get(serverIndex), movedRegions));
653        taskList.add(task);
654      } else {
655        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
656          server, regionServers.get(serverIndex), movedRegions));
657        taskList.add(task);
658      }
659      serverIndex = (serverIndex + 1) % regionServers.size();
660    }
661    moveRegionsPool.shutdown();
662    long timeoutInSeconds = regionsToMove.size()
663      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
664    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
665  }
666
667  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
668    throws TimeoutException, InterruptedException, ExecutionException {
669    pool.shutdown();
670    try {
671      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
672        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
673          + this.timeout + "sec");
674        pool.shutdownNow();
675      }
676    } catch (InterruptedException e) {
677      pool.shutdownNow();
678      Thread.currentThread().interrupt();
679    }
680    try {
681      return task.get(5, TimeUnit.SECONDS);
682    } catch (InterruptedException e) {
683      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
684      throw e;
685    } catch (ExecutionException e) {
686      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
687      throw e;
688    }
689  }
690
691  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
692    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
693    try {
694      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
695        moveRegionsPool.shutdownNow();
696      }
697    } catch (InterruptedException e) {
698      moveRegionsPool.shutdownNow();
699      Thread.currentThread().interrupt();
700    }
701    for (Future<Boolean> future : taskList) {
702      try {
703        // if even after shutdownNow threads are stuck we wait for 5 secs max
704        if (!future.get(5, TimeUnit.SECONDS)) {
705          LOG.error("Was Not able to move region....Exiting Now");
706          throw new Exception("Could not move region Exception");
707        }
708      } catch (InterruptedException e) {
709        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
710        throw e;
711      } catch (ExecutionException e) {
712        boolean ignoreFailure = ignoreRegionMoveFailure(e);
713        if (ignoreFailure) {
714          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
715        } else {
716          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
717          throw e;
718        }
719      } catch (CancellationException e) {
720        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
721          + "secs", e);
722        throw e;
723      }
724    }
725  }
726
727  private boolean ignoreRegionMoveFailure(ExecutionException e) {
728    boolean ignoreFailure = false;
729    if (e.getCause() instanceof UnknownRegionException) {
730      // region does not exist anymore
731      ignoreFailure = true;
732    } else if (
733      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
734        && e.getCause().getMessage()
735          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
736    ) {
737      // region is recently split
738      ignoreFailure = true;
739    }
740    return ignoreFailure;
741  }
742
743  private ServerName getTargetServer() throws Exception {
744    ServerName server = null;
745    int maxWaitInSeconds =
746      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
747    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
748    while (EnvironmentEdgeManager.currentTime() < maxWait) {
749      try {
750        List<ServerName> regionServers = new ArrayList<>();
751        regionServers.addAll(admin.getRegionServers());
752        // Remove the host Region server from target Region Servers list
753        server = stripServer(regionServers, hostname, port);
754        if (server != null) {
755          break;
756        } else {
757          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
758        }
759      } catch (IOException e) {
760        LOG.warn("Could not get list of region servers", e);
761      }
762      Thread.sleep(500);
763    }
764    if (server == null) {
765      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
766      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
767    }
768    return server;
769  }
770
771  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
772    List<RegionInfo> regions = new ArrayList<>();
773    File f = new File(filename);
774    if (!f.exists()) {
775      return regions;
776    }
777    try (
778      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
779      int numRegions = dis.readInt();
780      int index = 0;
781      while (index < numRegions) {
782        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
783        index++;
784      }
785    } catch (IOException e) {
786      LOG.error("Error while reading regions from file:" + filename, e);
787      throw e;
788    }
789    return regions;
790  }
791
792  /**
793   * Write the number of regions moved in the first line followed by regions moved in subsequent
794   * lines
795   */
796  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
797    try (DataOutputStream dos =
798      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
799      dos.writeInt(movedRegions.size());
800      for (RegionInfo region : movedRegions) {
801        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
802      }
803    } catch (IOException e) {
804      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
805        + movedRegions.size() + " regions", e);
806      throw e;
807    }
808  }
809
810  private void deleteFile(String filename) {
811    File f = new File(filename);
812    if (f.exists()) {
813      f.delete();
814    }
815  }
816
817  /**
818   * @param filename The file should have 'host:port' per line
819   * @return List of servers from the file in format 'hostname:port'.
820   */
821  private List<String> readServersFromFile(String filename) throws IOException {
822    List<String> servers = new ArrayList<>();
823    if (filename != null) {
824      try {
825        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
826          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
827          .forEach(servers::add);
828      } catch (IOException e) {
829        LOG.error("Exception while reading servers from file,", e);
830        throw e;
831      }
832    }
833    return servers;
834  }
835
836  /**
837   * Designates or excludes the servername whose hostname and port portion matches the list given in
838   * the file. Example:<br>
839   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
840   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
841   * are removed from regionServers list so that regions can move to only RS1. If you want to
842   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
843   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
844   * list so that regions can move to only RS2 and RS3.
845   */
846  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
847    boolean isInclude) throws IOException {
848    if (fileName != null) {
849      List<String> servers = readServersFromFile(fileName);
850      if (servers.isEmpty()) {
851        LOG.warn("No servers provided in the file: {}." + fileName);
852        return;
853      }
854      Iterator<ServerName> i = regionServers.iterator();
855      while (i.hasNext()) {
856        String rs = i.next().getServerName();
857        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
858          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
859        if (isInclude != servers.contains(rsPort)) {
860          i.remove();
861        }
862      }
863    }
864  }
865
866  /**
867   * Exclude master from list of RSs to move regions to
868   */
869  private void stripMaster(List<ServerName> regionServers) throws IOException {
870    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
871    stripServer(regionServers, master.getHostname(), master.getPort());
872  }
873
874  /**
875   * Remove the servername whose hostname and port portion matches from the passed array of servers.
876   * Returns as side-effect the servername removed.
877   * @return server removed from list of Region Servers
878   */
879  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
880    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
881      ServerName server = iter.next();
882      if (
883        server.getAddress().getHostName().equalsIgnoreCase(hostname)
884          && server.getAddress().getPort() == port
885      ) {
886        iter.remove();
887        return server;
888      }
889    }
890    return null;
891  }
892
893  @Override
894  protected void addOptions() {
895    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
896    this.addRequiredOptWithArg("o", "operation",
897      "Expected: load/unload/unload_from_rack/isolate_regions");
898    this.addOptWithArg("m", "maxthreads",
899      "Define the maximum number of threads to use to unload and reload the regions");
900    this.addOptWithArg("i", "isolateRegionIds",
901      "Comma separated list of Region IDs hash to isolate on a RegionServer and put region server"
902        + " in draining mode. This option should only be used with '-o isolate_regions'."
903        + " By putting region server in decommission/draining mode, master can't assign any"
904        + " new region on this server. If one or more regions are not found OR failed to isolate"
905        + " successfully, utility will exist without putting RS in draining/decommission mode."
906        + " Ex. --isolateRegionIds id1,id2,id3 OR -i id1,id2,id3");
907    this.addOptWithArg("x", "excludefile",
908      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
909        + "target host; useful for rack decommisioning.");
910    this.addOptWithArg("d", "designatedfile",
911      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
912    this.addOptWithArg("f", "filename",
913      "File to save regions list into unloading, or read from loading; "
914        + "default /tmp/<usernamehostname:port>");
915    this.addOptNoArg("n", "noack",
916      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
917        + "RegionServer, hence best effort. This is more performant in unloading and loading "
918        + "but might lead to region being unavailable for some time till master reassigns it "
919        + "in case the move failed");
920    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
921      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
922  }
923
924  @Override
925  protected void processOptions(CommandLine cmd) {
926    String hostname = cmd.getOptionValue("r");
927    rmbuilder = new RegionMoverBuilder(hostname);
928    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
929    if (cmd.hasOption('m')) {
930      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
931    }
932    if (this.loadUnload.equals("isolate_regions") && cmd.hasOption("isolateRegionIds")) {
933      rmbuilder
934        .isolateRegionIdArray(Arrays.asList(cmd.getOptionValue("isolateRegionIds").split(",")));
935    }
936    if (cmd.hasOption('n')) {
937      rmbuilder.ack(false);
938    }
939    if (cmd.hasOption('f')) {
940      rmbuilder.filename(cmd.getOptionValue('f'));
941    }
942    if (cmd.hasOption('x')) {
943      rmbuilder.excludeFile(cmd.getOptionValue('x'));
944    }
945    if (cmd.hasOption('d')) {
946      rmbuilder.designatedFile(cmd.getOptionValue('d'));
947    }
948    if (cmd.hasOption('t')) {
949      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
950    }
951    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
952  }
953
954  @Override
955  protected int doWork() throws Exception {
956    boolean success;
957    try (RegionMover rm = rmbuilder.build()) {
958      if (loadUnload.equalsIgnoreCase("load")) {
959        success = rm.load();
960      } else if (loadUnload.equalsIgnoreCase("unload")) {
961        success = rm.unload();
962      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
963        success = rm.unloadFromRack();
964      } else if (loadUnload.equalsIgnoreCase("isolate_regions")) {
965        if (rm.isolateRegionIdArray != null && !rm.isolateRegionIdArray.isEmpty()) {
966          success = rm.isolateRegions();
967        } else {
968          LOG.error("Missing -i/--isolate_regions option with '-o isolate_regions' option");
969          LOG.error("Use -h or --help for usage instructions");
970          printUsage();
971          success = false;
972        }
973      } else {
974        printUsage();
975        success = false;
976      }
977    }
978    return (success ? 0 : 1);
979  }
980
981  public static void main(String[] args) {
982    try (RegionMover mover = new RegionMover()) {
983      mover.doStaticMain(args);
984    }
985  }
986}