001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorCompletionService;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044/**
045 * Utility methods for interacting with the regions.
046 */
047@InterfaceAudience.Private
048public abstract class ModifyRegionUtils {
049  private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class);
050
051  private ModifyRegionUtils() {
052  }
053
054  public interface RegionFillTask {
055    void fillRegion(final HRegion region) throws IOException;
056  }
057
058  public interface RegionEditTask {
059    void editRegion(final RegionInfo region) throws IOException;
060  }
061
062  public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor,
063    byte[][] splitKeys) {
064    long regionId = EnvironmentEdgeManager.currentTime();
065    RegionInfo[] hRegionInfos = null;
066    if (splitKeys == null || splitKeys.length == 0) {
067      hRegionInfos = new RegionInfo[] { RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
068        .setStartKey(null).setEndKey(null).setSplit(false).setRegionId(regionId).build() };
069    } else {
070      int numRegions = splitKeys.length + 1;
071      hRegionInfos = new RegionInfo[numRegions];
072      byte[] startKey = null;
073      byte[] endKey = null;
074      for (int i = 0; i < numRegions; i++) {
075        endKey = (i == splitKeys.length) ? null : splitKeys[i];
076        hRegionInfos[i] = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
077          .setStartKey(startKey).setEndKey(endKey).setSplit(false).setRegionId(regionId).build();
078        startKey = endKey;
079      }
080    }
081    return hRegionInfos;
082  }
083
084  /**
085   * Create new set of regions on the specified file-system. NOTE: that you should add the regions
086   * to hbase:meta after this operation.
087   * @param conf            {@link Configuration}
088   * @param rootDir         Root directory for HBase instance
089   * @param tableDescriptor description of the table
090   * @param newRegions      {@link RegionInfo} that describes the regions to create
091   * @param task            {@link RegionFillTask} custom code to populate region after creation
092   */
093  public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir,
094    final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task)
095    throws IOException {
096    if (newRegions == null) return null;
097    int regionNumber = newRegions.length;
098    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
099      "RegionOpenAndInit-" + tableDescriptor.getTableName(), regionNumber);
100    try {
101      return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task);
102    } finally {
103      exec.shutdownNow();
104    }
105  }
106
107  /**
108   * Create new set of regions on the specified file-system. NOTE: that you should add the regions
109   * to hbase:meta after this operation.
110   * @param exec            Thread Pool Executor
111   * @param conf            {@link Configuration}
112   * @param rootDir         Root directory for HBase instance
113   * @param tableDescriptor description of the table
114   * @param newRegions      {@link RegionInfo} that describes the regions to create
115   * @param task            {@link RegionFillTask} custom code to populate region after creation
116   */
117  public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec,
118    final Configuration conf, final Path rootDir, final TableDescriptor tableDescriptor,
119    final RegionInfo[] newRegions, final RegionFillTask task) throws IOException {
120    if (newRegions == null) return null;
121    int regionNumber = newRegions.length;
122    CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec);
123    List<RegionInfo> regionInfos = new ArrayList<>();
124    for (final RegionInfo newRegion : newRegions) {
125      completionService.submit(new Callable<RegionInfo>() {
126        @Override
127        public RegionInfo call() throws IOException {
128          return createRegion(conf, rootDir, tableDescriptor, newRegion, task);
129        }
130      });
131    }
132    try {
133      // wait for all regions to finish creation
134      for (int i = 0; i < regionNumber; i++) {
135        regionInfos.add(completionService.take().get());
136      }
137    } catch (InterruptedException e) {
138      LOG.error("Caught " + e + " during region creation");
139      throw new InterruptedIOException(e.getMessage());
140    } catch (ExecutionException e) {
141      throw new IOException(e);
142    }
143    return regionInfos;
144  }
145
146  /**
147   * Create new set of regions on the specified file-system.
148   * @param conf            {@link Configuration}
149   * @param rootDir         Root directory for HBase instance
150   * @param tableDescriptor description of the table
151   * @param newRegion       {@link RegionInfo} that describes the region to create
152   * @param task            {@link RegionFillTask} custom code to populate region after creation
153   */
154  public static RegionInfo createRegion(final Configuration conf, final Path rootDir,
155    final TableDescriptor tableDescriptor, final RegionInfo newRegion, final RegionFillTask task)
156    throws IOException {
157    // 1. Create HRegion
158    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
159    // unless I pass along via the conf.
160    Configuration confForWAL = new Configuration(conf);
161    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
162    HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false);
163    try {
164      // 2. Custom user code to interact with the created region
165      if (task != null) {
166        task.fillRegion(region);
167      }
168    } finally {
169      // 3. Close the new region to flush to disk. Close log file too.
170      region.close();
171    }
172    return region.getRegionInfo();
173  }
174
175  /**
176   * Execute the task on the specified set of regions.
177   * @param exec    Thread Pool Executor
178   * @param regions {@link RegionInfo} that describes the regions to edit
179   * @param task    {@link RegionFillTask} custom code to edit the region
180   */
181  public static void editRegions(final ThreadPoolExecutor exec,
182    final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException {
183    final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec);
184    for (final RegionInfo hri : regions) {
185      completionService.submit(new Callable<Void>() {
186        @Override
187        public Void call() throws IOException {
188          task.editRegion(hri);
189          return null;
190        }
191      });
192    }
193
194    try {
195      for (RegionInfo hri : regions) {
196        completionService.take().get();
197      }
198    } catch (InterruptedException e) {
199      throw new InterruptedIOException(e.getMessage());
200    } catch (ExecutionException e) {
201      throw new IOException(e.getCause());
202    }
203  }
204
205  /*
206   * used by createRegions() to get the thread pool executor based on the
207   * "hbase.hregion.open.and.init.threads.max" property.
208   */
209  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
210    final String threadNamePrefix, int regionNumber) {
211    int maxThreads =
212      Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16));
213    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads,
214      30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
215        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
216    return regionOpenAndInitThreadPool;
217  }
218}