001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.math.BigInteger; 025import java.util.ArrayList; 026import java.util.Deque; 027import java.util.HashMap; 028import java.util.LinkedList; 029import java.util.List; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.concurrent.Callable; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Future; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.FileUtil; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.permission.FsPermission; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.RegionLocator; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.security.UserProvider; 051import org.apache.hadoop.hbase.security.token.FsDelegationToken; 052import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 053import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.util.Threads; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 063 064/** 065 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local 066 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of 067 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster. 068 * Call {@link #close()} when done. 069 */ 070@InterfaceAudience.Private 071public class HFileReplicator implements Closeable { 072 /** Maximum number of threads to allow in pool to copy hfiles during replication */ 073 public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = 074 "hbase.replication.bulkload.copy.maxthreads"; 075 public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10; 076 /** Number of hfiles to copy per thread during replication */ 077 public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = 078 "hbase.replication.bulkload.copy.hfiles.perthread"; 079 public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10; 080 081 private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class); 082 private static final String UNDERSCORE = "_"; 083 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); 084 085 private Configuration sourceClusterConf; 086 private String sourceBaseNamespaceDirPath; 087 private String sourceHFileArchiveDirPath; 088 private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap; 089 private FileSystem sinkFs; 090 private FsDelegationToken fsDelegationToken; 091 private UserProvider userProvider; 092 private Configuration conf; 093 private Connection connection; 094 private Path hbaseStagingDir; 095 private ThreadPoolExecutor exec; 096 private int maxCopyThreads; 097 private int copiesPerThread; 098 private List<String> sourceClusterIds; 099 100 public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, 101 String sourceHFileArchiveDirPath, Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, 102 Configuration conf, Connection connection, List<String> sourceClusterIds) throws IOException { 103 this.sourceClusterConf = sourceClusterConf; 104 this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; 105 this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; 106 this.bulkLoadHFileMap = tableQueueMap; 107 this.conf = conf; 108 this.connection = connection; 109 this.sourceClusterIds = sourceClusterIds; 110 111 userProvider = UserProvider.instantiate(conf); 112 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 113 this.hbaseStagingDir = 114 new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME); 115 this.maxCopyThreads = this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 116 REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); 117 this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS, 118 new ThreadFactoryBuilder().setDaemon(true) 119 .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).build()); 120 this.copiesPerThread = conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 121 REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); 122 123 sinkFs = FileSystem.get(conf); 124 } 125 126 @Override 127 public void close() throws IOException { 128 if (this.exec != null) { 129 this.exec.shutdown(); 130 } 131 } 132 133 public Void replicate() throws IOException { 134 // Copy all the hfiles to the local file system 135 Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir(); 136 137 int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 138 139 for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) { 140 String tableNameString = tableStagingDir.getKey(); 141 Path stagingDir = tableStagingDir.getValue(); 142 143 LoadIncrementalHFiles loadHFiles = null; 144 try { 145 loadHFiles = new LoadIncrementalHFiles(conf); 146 loadHFiles.setClusterIds(sourceClusterIds); 147 } catch (Exception e) { 148 LOG.error("Failed initialize LoadIncrementalHFiles for replicating bulk loaded data.", e); 149 throw new IOException(e); 150 } 151 Configuration newConf = HBaseConfiguration.create(conf); 152 newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 153 loadHFiles.setConf(newConf); 154 155 TableName tableName = TableName.valueOf(tableNameString); 156 Table table = this.connection.getTable(tableName); 157 158 // Prepare collection of queue of hfiles to be loaded(replicated) 159 Deque<LoadQueueItem> queue = new LinkedList<>(); 160 loadHFiles.prepareHFileQueue(stagingDir, table, queue, false); 161 162 if (queue.isEmpty()) { 163 LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); 164 return null; 165 } 166 167 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 168 fsDelegationToken.acquireDelegationToken(sinkFs); 169 // Set the staging directory which will be used by LoadIncrementalHFiles for loading the 170 // data 171 loadHFiles.setBulkToken(stagingDir.toString()); 172 doBulkLoad(loadHFiles, table, queue, locator, maxRetries); 173 } finally { 174 cleanup(stagingDir.toString(), table); 175 } 176 } 177 return null; 178 } 179 180 private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, Deque<LoadQueueItem> queue, 181 RegionLocator locator, int maxRetries) throws IOException { 182 int count = 0; 183 Pair<byte[][], byte[][]> startEndKeys; 184 while (!queue.isEmpty()) { 185 // need to reload split keys each iteration. 186 startEndKeys = locator.getStartEndKeys(); 187 if (count != 0) { 188 LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size()); 189 } 190 191 if (maxRetries != 0 && count >= maxRetries) { 192 throw new IOException("Retry attempted " + count + " times without completing, bailing."); 193 } 194 count++; 195 196 // Try bulk load 197 loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys); 198 } 199 } 200 201 private void cleanup(String stagingDir, Table table) { 202 // Release the file system delegation token 203 fsDelegationToken.releaseDelegationToken(); 204 // Delete the staging directory 205 if (stagingDir != null) { 206 try { 207 sinkFs.delete(new Path(stagingDir), true); 208 } catch (IOException e) { 209 LOG.warn("Failed to delete the staging directory " + stagingDir, e); 210 } 211 } 212 // Do not close the file system 213 214 /* 215 * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn( 216 * "Failed to close the file system"); } } 217 */ 218 219 // Close the table 220 if (table != null) { 221 try { 222 table.close(); 223 } catch (IOException e) { 224 LOG.warn("Failed to close the table.", e); 225 } 226 } 227 } 228 229 private Map<String, Path> copyHFilesToStagingDir() throws IOException { 230 Map<String, Path> mapOfCopiedHFiles = new HashMap<>(); 231 Pair<byte[], List<String>> familyHFilePathsPair; 232 List<String> hfilePaths; 233 byte[] family; 234 Path familyStagingDir; 235 int familyHFilePathsPairsListSize; 236 int totalNoOfHFiles; 237 List<Pair<byte[], List<String>>> familyHFilePathsPairsList; 238 FileSystem sourceFs = null; 239 240 try { 241 Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath); 242 /* 243 * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster 244 * has same FS name service then it will return peer cluster FS. To avoid this we explicitly 245 * disable the loading of FS from cache, so that a new FS is created with source cluster 246 * configuration. 247 */ 248 String sourceScheme = sourceClusterPath.toUri().getScheme(); 249 String disableCacheName = 250 String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme }); 251 sourceClusterConf.setBoolean(disableCacheName, true); 252 253 sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf); 254 255 User user = userProvider.getCurrent(); 256 // For each table name in the map 257 for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap 258 .entrySet()) { 259 String tableName = tableEntry.getKey(); 260 261 // Create staging directory for each table 262 Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); 263 264 familyHFilePathsPairsList = tableEntry.getValue(); 265 familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); 266 267 // For each list of family hfile paths pair in the table 268 for (int i = 0; i < familyHFilePathsPairsListSize; i++) { 269 familyHFilePathsPair = familyHFilePathsPairsList.get(i); 270 271 family = familyHFilePathsPair.getFirst(); 272 hfilePaths = familyHFilePathsPair.getSecond(); 273 274 familyStagingDir = new Path(stagingDir, Bytes.toString(family)); 275 totalNoOfHFiles = hfilePaths.size(); 276 277 // For each list of hfile paths for the family 278 List<Future<Void>> futures = new ArrayList<>(); 279 Callable<Void> c; 280 Future<Void> future; 281 int currentCopied = 0; 282 // Copy the hfiles parallely 283 while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { 284 c = new Copier(sourceFs, familyStagingDir, 285 hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread)); 286 future = exec.submit(c); 287 futures.add(future); 288 currentCopied += this.copiesPerThread; 289 } 290 291 int remaining = totalNoOfHFiles - currentCopied; 292 if (remaining > 0) { 293 c = new Copier(sourceFs, familyStagingDir, 294 hfilePaths.subList(currentCopied, currentCopied + remaining)); 295 future = exec.submit(c); 296 futures.add(future); 297 } 298 299 for (Future<Void> f : futures) { 300 try { 301 f.get(); 302 } catch (InterruptedException e) { 303 InterruptedIOException iioe = new InterruptedIOException( 304 "Failed to copy HFiles to local file system. This will be retried again " 305 + "by the source cluster."); 306 iioe.initCause(e); 307 throw iioe; 308 } catch (ExecutionException e) { 309 throw new IOException("Failed to copy HFiles to local file system. This will " 310 + "be retried again by the source cluster.", e); 311 } 312 } 313 } 314 // Add the staging directory to this table. Staging directory contains all the hfiles 315 // belonging to this table 316 mapOfCopiedHFiles.put(tableName, stagingDir); 317 } 318 return mapOfCopiedHFiles; 319 } finally { 320 if (sourceFs != null) { 321 sourceFs.close(); 322 } 323 if (exec != null) { 324 exec.shutdown(); 325 } 326 } 327 } 328 329 private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { 330 String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); 331 int RANDOM_WIDTH = 320; 332 int RANDOM_RADIX = 32; 333 String doubleUnderScore = UNDERSCORE + UNDERSCORE; 334 String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore 335 + (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX)); 336 return createStagingDir(baseDir, user, randomDir); 337 } 338 339 private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException { 340 Path p = new Path(baseDir, randomDir); 341 sinkFs.mkdirs(p, PERM_ALL_ACCESS); 342 sinkFs.setPermission(p, PERM_ALL_ACCESS); 343 return p; 344 } 345 346 /** 347 * This class will copy the given hfiles from the given source file system to the given local file 348 * system staging directory. 349 */ 350 private class Copier implements Callable<Void> { 351 private FileSystem sourceFs; 352 private Path stagingDir; 353 private List<String> hfiles; 354 355 public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles) 356 throws IOException { 357 this.sourceFs = sourceFs; 358 this.stagingDir = stagingDir; 359 this.hfiles = hfiles; 360 } 361 362 @Override 363 public Void call() throws IOException { 364 Path sourceHFilePath; 365 Path localHFilePath; 366 int totalHFiles = hfiles.size(); 367 for (int i = 0; i < totalHFiles; i++) { 368 sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); 369 localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); 370 try { 371 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 372 // If any other exception other than FNFE then we will fail the replication requests and 373 // source will retry to replicate these data. 374 } catch (FileNotFoundException e) { 375 LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 376 + ". Trying to copy from hfile archive directory.", e); 377 sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); 378 379 try { 380 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 381 } catch (FileNotFoundException e1) { 382 // This will mean that the hfile does not exists any where in source cluster FS. So we 383 // cannot do anything here just log and continue. 384 LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 385 + ". Hence ignoring this hfile from replication..", e1); 386 continue; 387 } 388 } 389 sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); 390 } 391 return null; 392 } 393 } 394}