001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorCompletionService; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044/** 045 * Utility methods for interacting with the regions. 046 */ 047@InterfaceAudience.Private 048public abstract class ModifyRegionUtils { 049 private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class); 050 051 private ModifyRegionUtils() { 052 } 053 054 public interface RegionFillTask { 055 void fillRegion(final HRegion region) throws IOException; 056 } 057 058 public interface RegionEditTask { 059 void editRegion(final RegionInfo region) throws IOException; 060 } 061 062 public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor, 063 byte[][] splitKeys) { 064 long regionId = EnvironmentEdgeManager.currentTime(); 065 RegionInfo[] hRegionInfos = null; 066 if (splitKeys == null || splitKeys.length == 0) { 067 hRegionInfos = new RegionInfo[] { RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 068 .setStartKey(null).setEndKey(null).setSplit(false).setRegionId(regionId).build() }; 069 } else { 070 int numRegions = splitKeys.length + 1; 071 hRegionInfos = new RegionInfo[numRegions]; 072 byte[] startKey = null; 073 byte[] endKey = null; 074 for (int i = 0; i < numRegions; i++) { 075 endKey = (i == splitKeys.length) ? null : splitKeys[i]; 076 hRegionInfos[i] = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 077 .setStartKey(startKey).setEndKey(endKey).setSplit(false).setRegionId(regionId).build(); 078 startKey = endKey; 079 } 080 } 081 return hRegionInfos; 082 } 083 084 /** 085 * Create new set of regions on the specified file-system. NOTE: that you should add the regions 086 * to hbase:meta after this operation. 087 * @param conf {@link Configuration} 088 * @param rootDir Root directory for HBase instance 089 * @param tableDescriptor description of the table 090 * @param newRegions {@link RegionInfo} that describes the regions to create 091 * @param task {@link RegionFillTask} custom code to populate region after creation 092 */ 093 public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir, 094 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task) 095 throws IOException { 096 if (newRegions == null) return null; 097 int regionNumber = newRegions.length; 098 ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf, 099 "RegionOpenAndInit-" + tableDescriptor.getTableName(), regionNumber); 100 try { 101 return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task); 102 } finally { 103 exec.shutdownNow(); 104 } 105 } 106 107 /** 108 * Create new set of regions on the specified file-system. NOTE: that you should add the regions 109 * to hbase:meta after this operation. 110 * @param exec Thread Pool Executor 111 * @param conf {@link Configuration} 112 * @param rootDir Root directory for HBase instance 113 * @param tableDescriptor description of the table 114 * @param newRegions {@link RegionInfo} that describes the regions to create 115 * @param task {@link RegionFillTask} custom code to populate region after creation 116 */ 117 public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec, 118 final Configuration conf, final Path rootDir, final TableDescriptor tableDescriptor, 119 final RegionInfo[] newRegions, final RegionFillTask task) throws IOException { 120 if (newRegions == null) return null; 121 int regionNumber = newRegions.length; 122 CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec); 123 List<RegionInfo> regionInfos = new ArrayList<>(); 124 for (final RegionInfo newRegion : newRegions) { 125 completionService.submit(new Callable<RegionInfo>() { 126 @Override 127 public RegionInfo call() throws IOException { 128 return createRegion(conf, rootDir, tableDescriptor, newRegion, task); 129 } 130 }); 131 } 132 try { 133 // wait for all regions to finish creation 134 for (int i = 0; i < regionNumber; i++) { 135 regionInfos.add(completionService.take().get()); 136 } 137 } catch (InterruptedException e) { 138 LOG.error("Caught " + e + " during region creation"); 139 throw new InterruptedIOException(e.getMessage()); 140 } catch (ExecutionException e) { 141 throw new IOException(e); 142 } 143 return regionInfos; 144 } 145 146 /** 147 * Create new set of regions on the specified file-system. 148 * @param conf {@link Configuration} 149 * @param rootDir Root directory for HBase instance 150 * @param tableDescriptor description of the table 151 * @param newRegion {@link RegionInfo} that describes the region to create 152 * @param task {@link RegionFillTask} custom code to populate region after creation 153 */ 154 public static RegionInfo createRegion(final Configuration conf, final Path rootDir, 155 final TableDescriptor tableDescriptor, final RegionInfo newRegion, final RegionFillTask task) 156 throws IOException { 157 // 1. Create HRegion 158 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 159 // unless I pass along via the conf. 160 Configuration confForWAL = new Configuration(conf); 161 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 162 HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false); 163 try { 164 // 2. Custom user code to interact with the created region 165 if (task != null) { 166 task.fillRegion(region); 167 } 168 } finally { 169 // 3. Close the new region to flush to disk. Close log file too. 170 region.close(); 171 } 172 return region.getRegionInfo(); 173 } 174 175 /** 176 * Execute the task on the specified set of regions. 177 * @param exec Thread Pool Executor 178 * @param regions {@link RegionInfo} that describes the regions to edit 179 * @param task {@link RegionFillTask} custom code to edit the region 180 */ 181 public static void editRegions(final ThreadPoolExecutor exec, 182 final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException { 183 final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec); 184 for (final RegionInfo hri : regions) { 185 completionService.submit(new Callable<Void>() { 186 @Override 187 public Void call() throws IOException { 188 task.editRegion(hri); 189 return null; 190 } 191 }); 192 } 193 194 try { 195 for (RegionInfo hri : regions) { 196 completionService.take().get(); 197 } 198 } catch (InterruptedException e) { 199 throw new InterruptedIOException(e.getMessage()); 200 } catch (ExecutionException e) { 201 throw new IOException(e.getCause()); 202 } 203 } 204 205 /* 206 * used by createRegions() to get the thread pool executor based on the 207 * "hbase.hregion.open.and.init.threads.max" property. 208 */ 209 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, 210 final String threadNamePrefix, int regionNumber) { 211 int maxThreads = 212 Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16)); 213 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads, 214 30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d") 215 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 216 return regionOpenAndInitThreadPool; 217 } 218}