001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.Callable;
023import org.apache.hadoop.hbase.HRegionLocation;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.client.Admin;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.client.ResultScanner;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Move Regions and make sure that they are up on the target server.If a region movement fails we
038 * exit as failure
039 */
040@InterfaceAudience.Private
041class MoveWithAck implements Callable<Boolean> {
042
043  private static final Logger LOG = LoggerFactory.getLogger(MoveWithAck.class);
044
045  private final RegionInfo region;
046  private final ServerName targetServer;
047  private final List<RegionInfo> movedRegions;
048  private final ServerName sourceServer;
049  private final Connection conn;
050  private final Admin admin;
051
052  MoveWithAck(Connection conn, RegionInfo regionInfo, ServerName sourceServer,
053    ServerName targetServer, List<RegionInfo> movedRegions) throws IOException {
054    this.conn = conn;
055    this.region = regionInfo;
056    this.targetServer = targetServer;
057    this.movedRegions = movedRegions;
058    this.sourceServer = sourceServer;
059    this.admin = conn.getAdmin();
060  }
061
062  @Override
063  public Boolean call() throws IOException, InterruptedException {
064    boolean moved = false;
065    int count = 0;
066    int retries = admin.getConfiguration().getInt(RegionMover.MOVE_RETRIES_MAX_KEY,
067      RegionMover.DEFAULT_MOVE_RETRIES_MAX);
068    int maxWaitInSeconds = admin.getConfiguration().getInt(RegionMover.MOVE_WAIT_MAX_KEY,
069      RegionMover.DEFAULT_MOVE_WAIT_MAX);
070    long startTime = EnvironmentEdgeManager.currentTime();
071    boolean sameServer = true;
072    // Assert we can scan the region in its current location
073    isSuccessfulScan(region);
074    LOG.info("Moving region: {} from {} to {}", region.getRegionNameAsString(), sourceServer,
075      targetServer);
076    while (count < retries && sameServer) {
077      if (count > 0) {
078        LOG.debug("Retry {} of maximum {} for region: {}", count, retries,
079          region.getRegionNameAsString());
080      }
081      count = count + 1;
082      admin.move(region.getEncodedNameAsBytes(), targetServer);
083      long maxWait = startTime + (maxWaitInSeconds * 1000);
084      while (EnvironmentEdgeManager.currentTime() < maxWait) {
085        sameServer = isSameServer(region, sourceServer);
086        if (!sameServer) {
087          break;
088        }
089        Thread.sleep(1000);
090      }
091    }
092    if (sameServer) {
093      LOG.error("Region: {} stuck on {} for {} sec , newServer={}", region.getRegionNameAsString(),
094        this.sourceServer, getTimeDiffInSec(startTime), this.targetServer);
095    } else {
096      isSuccessfulScan(region);
097      LOG.info("Moved Region {} , cost (sec): {}", region.getRegionNameAsString(),
098        getTimeDiffInSec(startTime));
099      moved = true;
100      movedRegions.add(region);
101    }
102    return moved;
103  }
104
105  private static String getTimeDiffInSec(long startTime) {
106    return String.format("%.3f", (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000);
107  }
108
109  /**
110   * Tries to scan a row from passed region
111   */
112  private void isSuccessfulScan(RegionInfo region) throws IOException {
113    Scan scan = new Scan().withStartRow(region.getStartKey()).withStopRow(region.getEndKey(), false)
114      .setRaw(true).setOneRowLimit().setMaxResultSize(1L).setCaching(1)
115      .setFilter(new FirstKeyOnlyFilter()).setCacheBlocks(false);
116    try (Table table = conn.getTable(region.getTable());
117      ResultScanner scanner = table.getScanner(scan)) {
118      scanner.next();
119    } catch (IOException e) {
120      LOG.error("Could not scan region: {}", region.getEncodedName(), e);
121      throw e;
122    }
123  }
124
125  /**
126   * Returns true if passed region is still on serverName when we look at hbase:meta.
127   * @return true if region is hosted on serverName otherwise false
128   */
129  private boolean isSameServer(RegionInfo region, ServerName serverName) throws IOException {
130    ServerName serverForRegion = getServerNameForRegion(region, admin, conn);
131    return serverForRegion != null && serverForRegion.equals(serverName);
132  }
133
134  /**
135   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
136   * startcode comma-delimited. Can return null
137   * @return regionServer hosting the given region
138   */
139  static ServerName getServerNameForRegion(RegionInfo region, Admin admin, Connection conn)
140    throws IOException {
141    if (!admin.isTableEnabled(region.getTable())) {
142      return null;
143    }
144    HRegionLocation loc;
145    try {
146      loc = conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
147        region.getReplicaId(), true);
148    } catch (IOException e) {
149      if (e.getMessage() != null && e.getMessage().startsWith("Unable to find region for")) {
150        return null;
151      }
152      throw e;
153    }
154    if (loc != null) {
155      return loc.getServerName();
156    } else {
157      return null;
158    }
159  }
160
161}