001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Future; 046import java.util.concurrent.LinkedBlockingQueue; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Collectors; 051import org.apache.commons.lang3.mutable.MutableInt; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.conf.Configured; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.TableNotFoundException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.ClientServiceCallable; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.RegionLocator; 069import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 070import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 074import org.apache.hadoop.hbase.io.HFileLink; 075import org.apache.hadoop.hbase.io.HalfStoreFileReader; 076import org.apache.hadoop.hbase.io.Reference; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFileContext; 081import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 083import org.apache.hadoop.hbase.io.hfile.HFileInfo; 084import org.apache.hadoop.hbase.io.hfile.HFileScanner; 085import org.apache.hadoop.hbase.io.hfile.ReaderContext; 086import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 087import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 088import org.apache.hadoop.hbase.regionserver.BloomType; 089import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 090import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 091import org.apache.hadoop.hbase.regionserver.StoreUtils; 092import org.apache.hadoop.hbase.security.UserProvider; 093import org.apache.hadoop.hbase.security.token.FsDelegationToken; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.FSUtils; 097import org.apache.hadoop.hbase.util.FSVisitor; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.util.Tool; 100import org.apache.hadoop.util.ToolRunner; 101import org.apache.yetus.audience.InterfaceAudience; 102import org.slf4j.Logger; 103import org.slf4j.LoggerFactory; 104 105import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 106import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 107import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 108import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 109import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 111 112/** 113 * Tool to load the output of HFileOutputFormat into an existing table. 114 * <p/> 115 * Notice that, by default, this class should be kept till 4.0.0, but as this is a bad practice that 116 * we expose an implementation class instead of an interface, we want to fix it ASAP. That's why we 117 * will remove this class completely in 3.0.0. Please change your code to use 118 * {@link BulkLoadHFiles}. 119 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please 120 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)} 121 * and {@link #run(String, TableName)}, as all the methods other than them will be 122 * removed with no replacement. 123 */ 124@Deprecated 125@InterfaceAudience.Public 126public class LoadIncrementalHFiles extends Configured implements Tool { 127 128 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 129 130 /** 131 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not 132 * depend on this value. 133 */ 134 @Deprecated 135 public static final String NAME = BulkLoadHFilesTool.NAME; 136 static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION; 137 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 138 BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY; 139 private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS; 140 public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY; 141 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = 142 BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY; 143 public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES; 144 145 public static final String FAIL_IF_NEED_SPLIT_HFILE = 146 "hbase.loadincremental.fail.if.need.split.hfile"; 147 148 // We use a '.' prefix which is ignored when walking directory trees 149 // above. It is invalid family name. 150 static final String TMP_DIR = ".tmp"; 151 152 private int maxFilesPerRegionPerFamily; 153 private boolean assignSeqIds; 154 private boolean bulkLoadByFamily; 155 156 // Source delegation token 157 private FsDelegationToken fsDelegationToken; 158 private UserProvider userProvider; 159 private int nrThreads; 160 private AtomicInteger numRetries; 161 private RpcControllerFactory rpcControllerFactory; 162 163 private String bulkToken; 164 165 private List<String> clusterIds = new ArrayList<>(); 166 167 private boolean replicate = true; 168 169 private boolean failIfNeedSplitHFile = false; 170 171 /** 172 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 173 * the case where a region has split during the process of the load. When this happens, the HFile 174 * is split into two physical parts across the new region boundary, and each part is added back 175 * into the queue. The import process finishes when the queue is empty. 176 * @deprecated since 2.2.0 and will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. 177 * @see BulkLoadHFiles 178 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a> 179 */ 180 @InterfaceAudience.Public 181 @Deprecated 182 public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { 183 184 public LoadQueueItem(byte[] family, Path hfilePath) { 185 super(family, hfilePath); 186 } 187 } 188 189 public LoadIncrementalHFiles(Configuration conf) { 190 // make a copy, just to be sure we're not overriding someone else's config 191 super(HBaseConfiguration.create(conf)); 192 initialize(); 193 } 194 195 public void initialize() { 196 Configuration conf = getConf(); 197 // disable blockcache for tool invocation, see HBASE-10500 198 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 199 userProvider = UserProvider.instantiate(conf); 200 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 201 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 202 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 203 bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 204 failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false); 205 nrThreads = 206 conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); 207 numRetries = new AtomicInteger(0); 208 rpcControllerFactory = new RpcControllerFactory(conf); 209 } 210 211 private void usage() { 212 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 213 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 214 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 215 + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D" 216 + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 217 + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY 218 + "=yes to ignore unmatched column families.\n" 219 + " -loadTable for when directory of files to load has a depth of 3; target table must " 220 + "exist;\n" + " must be last of the options on command line.\n" 221 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 222 + "documentation.\n"); 223 } 224 225 /** 226 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 227 * passed directory and validates whether the prepared queue has all the valid table column 228 * families in it. 229 * @param hfilesDir directory containing list of hfiles to be loaded into the table 230 * @param table table to which hfiles should be loaded 231 * @param queue queue which needs to be loaded into the table 232 * @param validateHFile if true hfiles will be validated for its format 233 * @throws IOException If any I/O or network error occurred 234 */ 235 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 236 boolean validateHFile) throws IOException { 237 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 238 } 239 240 /** 241 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 242 * passed directory and validates whether the prepared queue has all the valid table column 243 * families in it. 244 * @param hfilesDir directory containing list of hfiles to be loaded into the table 245 * @param table table to which hfiles should be loaded 246 * @param queue queue which needs to be loaded into the table 247 * @param validateHFile if true hfiles will be validated for its format 248 * @param silence true to ignore unmatched column families 249 * @throws IOException If any I/O or network error occurred 250 */ 251 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 252 boolean validateHFile, boolean silence) throws IOException { 253 discoverLoadQueue(queue, hfilesDir, validateHFile); 254 validateFamiliesInHFiles(table, queue, silence); 255 } 256 257 /** 258 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 259 * passed directory and validates whether the prepared queue has all the valid table column 260 * families in it. 261 * @param map map of family to List of hfiles 262 * @param table table to which hfiles should be loaded 263 * @param queue queue which needs to be loaded into the table 264 * @param silence true to ignore unmatched column families 265 * @throws IOException If any I/O or network error occurred 266 */ 267 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 268 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 269 populateLoadQueue(queue, map); 270 validateFamiliesInHFiles(table, queue, silence); 271 } 272 273 /** 274 * Perform a bulk load of the given directory into the given pre-existing table. This method is 275 * not threadsafe. 276 * @param hfofDir the directory that was provided as the output path of a job using 277 * HFileOutputFormat 278 * @param admin the Admin 279 * @param table the table to load into 280 * @param regionLocator region locator 281 * @throws TableNotFoundException if table does not yet exist 282 */ 283 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 284 RegionLocator regionLocator) throws TableNotFoundException, IOException { 285 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 286 } 287 288 /** 289 * Perform a bulk load of the given directory into the given pre-existing table. This method is 290 * not threadsafe. 291 * @param map map of family to List of hfiles 292 * @param admin the Admin 293 * @param table the table to load into 294 * @param regionLocator region locator 295 * @param silence true to ignore unmatched column families 296 * @param copyFile always copy hfiles if true 297 * @throws TableNotFoundException if table does not yet exist 298 */ 299 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 300 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 301 throws TableNotFoundException, IOException { 302 if (!admin.isTableAvailable(regionLocator.getName())) { 303 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 304 } 305 // LQI queue does not need to be threadsafe -- all operations on this queue 306 // happen in this thread 307 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 308 ExecutorService pool = null; 309 SecureBulkLoadClient secureClient = null; 310 try { 311 prepareHFileQueue(map, table, queue, silence); 312 if (queue.isEmpty()) { 313 LOG.warn("Bulk load operation did not get any files to load"); 314 return Collections.emptyMap(); 315 } 316 pool = createExecutorService(); 317 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 318 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 319 } finally { 320 cleanup(admin, queue, pool, secureClient); 321 } 322 } 323 324 /** 325 * Perform a bulk load of the given directory into the given pre-existing table. This method is 326 * not threadsafe. 327 * @param hfofDir the directory that was provided as the output path of a job using 328 * HFileOutputFormat 329 * @param admin the Admin 330 * @param table the table to load into 331 * @param regionLocator region locator 332 * @param silence true to ignore unmatched column families 333 * @param copyFile always copy hfiles if true 334 * @throws TableNotFoundException if table does not yet exist 335 */ 336 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 337 RegionLocator regionLocator, boolean silence, boolean copyFile) 338 throws TableNotFoundException, IOException { 339 if (!admin.isTableAvailable(regionLocator.getName())) { 340 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 341 } 342 343 /* 344 * Checking hfile format is a time-consuming operation, we should have an option to skip this 345 * step when bulkloading millions of HFiles. See HBASE-13985. 346 */ 347 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 348 if (!validateHFile) { 349 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " 350 + "are not correct. If you fail to read data from your table after using this " 351 + "option, consider removing the files and bulkload again without this option. " 352 + "See HBASE-13985"); 353 } 354 // LQI queue does not need to be threadsafe -- all operations on this queue 355 // happen in this thread 356 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 357 ExecutorService pool = null; 358 SecureBulkLoadClient secureClient = null; 359 try { 360 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 361 362 if (queue.isEmpty()) { 363 LOG.warn( 364 "Bulk load operation did not find any files to load in directory {}. " 365 + "Does it contain files in subdirectories that correspond to column family names?", 366 (hfofDir != null ? hfofDir.toUri().toString() : "")); 367 return Collections.emptyMap(); 368 } 369 pool = createExecutorService(); 370 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 371 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 372 } finally { 373 cleanup(admin, queue, pool, secureClient); 374 } 375 } 376 377 /** 378 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 379 * <ol> 380 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 381 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 382 * </li> 383 * </ol> 384 * @param table Table to which these hfiles should be loaded to 385 * @param conn Connection to use 386 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 387 * @param startEndKeys starting and ending row keys of the region 388 */ 389 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 390 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 391 loadHFileQueue(table, conn, queue, startEndKeys, false); 392 } 393 394 /** 395 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 396 * <ol> 397 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 398 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 399 * </li> 400 * </ol> 401 * @param table Table to which these hfiles should be loaded to 402 * @param conn Connection to use 403 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 404 * @param startEndKeys starting and ending row keys of the region 405 */ 406 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 407 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 408 ExecutorService pool = null; 409 try { 410 pool = createExecutorService(); 411 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 412 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 413 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 414 } finally { 415 if (pool != null) { 416 pool.shutdown(); 417 } 418 } 419 } 420 421 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 422 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 423 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 424 int count = 0; 425 426 if (isSecureBulkLoadEndpointAvailable()) { 427 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 428 LOG.warn("Secure bulk load has been integrated into HBase core."); 429 } 430 431 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 432 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 433 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 434 435 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 436 // Assumes that region splits can happen while this occurs. 437 while (!queue.isEmpty()) { 438 // need to reload split keys each iteration. 439 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 440 if (count != 0) { 441 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " 442 + queue.size() + " files remaining to group or split"); 443 } 444 445 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 446 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 447 if (maxRetries != 0 && count >= maxRetries) { 448 throw new IOException( 449 "Retry attempted " + count + " times without completing, bailing out"); 450 } 451 count++; 452 453 // Using ByteBuffer for byte[] equality semantics 454 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 455 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 456 457 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 458 // Error is logged inside checkHFilesCountPerRegionPerFamily. 459 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily 460 + " hfiles to one family of one region"); 461 } 462 463 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 464 item2RegionMap); 465 466 // NOTE: The next iteration's split / group could happen in parallel to 467 // atomic bulkloads assuming that there are splits and no merges, and 468 // that we can atomically pull out the groups we want to retry. 469 } 470 471 if (!queue.isEmpty()) { 472 throw new RuntimeException( 473 "Bulk load aborted with some files not yet loaded." + "Please check log for more details."); 474 } 475 return item2RegionMap; 476 } 477 478 private Map<byte[], Collection<LoadQueueItem>> 479 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 480 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 481 itemsInRegion.forEach(item -> families2Queue 482 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 483 return families2Queue; 484 } 485 486 /** 487 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 488 * re-queued for another pass with the groupOrSplitPhase. 489 * <p> 490 * protected for testing. 491 */ 492 @InterfaceAudience.Private 493 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 494 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 495 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 496 // atomically bulk load the groups. 497 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 498 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 499 .entrySet()) { 500 byte[] first = e.getKey().array(); 501 Collection<LoadQueueItem> lqis = e.getValue(); 502 if (item2RegionMap != null) { 503 for (LoadQueueItem lqi : lqis) { 504 item2RegionMap.put(lqi, e.getKey()); 505 } 506 } 507 if (bulkLoadByFamily) { 508 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures.add(pool 509 .submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, familyQueue, copyFile)))); 510 } else { 511 loadingFutures.add( 512 pool.submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile))); 513 } 514 } 515 516 // get all the results. 517 for (Future<List<LoadQueueItem>> future : loadingFutures) { 518 try { 519 List<LoadQueueItem> toRetry = future.get(); 520 521 if (item2RegionMap != null) { 522 for (LoadQueueItem lqi : toRetry) { 523 item2RegionMap.remove(lqi); 524 } 525 } 526 // LQIs that are requeued to be regrouped. 527 queue.addAll(toRetry); 528 529 } catch (ExecutionException e1) { 530 Throwable t = e1.getCause(); 531 if (t instanceof IOException) { 532 // At this point something unrecoverable has happened. 533 // TODO Implement bulk load recovery 534 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 535 } 536 LOG.error("Unexpected execution exception during bulk load", e1); 537 throw new IllegalStateException(t); 538 } catch (InterruptedException e1) { 539 LOG.error("Unexpected interrupted exception during bulk load", e1); 540 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 541 } 542 } 543 } 544 545 @InterfaceAudience.Private 546 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 547 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 548 List<Pair<byte[], String>> famPaths = 549 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 550 .collect(Collectors.toList()); 551 return new ClientServiceCallable<byte[]>(conn, tableName, first, 552 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, Collections.emptyMap()) { 553 @Override 554 protected byte[] rpcCall() throws Exception { 555 SecureBulkLoadClient secureClient = null; 556 boolean success = false; 557 try { 558 if (LOG.isDebugEnabled()) { 559 LOG.debug("Going to connect to server " + getLocation() + " for row " 560 + Bytes.toStringBinary(getRow()) + " with hfile group " 561 + LoadIncrementalHFiles.this.toString(famPaths)); 562 } 563 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 564 try (Table table = conn.getTable(getTableName())) { 565 secureClient = new SecureBulkLoadClient(getConf(), table); 566 success = 567 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds, 568 fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds, replicate); 569 } 570 return success ? regionName : null; 571 } finally { 572 // Best effort copying of files that might not have been imported 573 // from the staging directory back to original location 574 // in user directory 575 if (secureClient != null && !success) { 576 FileSystem targetFs = FileSystem.get(getConf()); 577 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 578 // Check to see if the source and target filesystems are the same 579 // If they are the same filesystem, we will try move the files back 580 // because previously we moved them to the staging directory. 581 if (FSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 582 for (Pair<byte[], String> el : famPaths) { 583 Path hfileStagingPath = null; 584 Path hfileOrigPath = new Path(el.getSecond()); 585 try { 586 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 587 hfileOrigPath.getName()); 588 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 589 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 590 } else if (targetFs.exists(hfileStagingPath)) { 591 LOG.debug( 592 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 593 } 594 } catch (Exception ex) { 595 LOG.debug( 596 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 597 } 598 } 599 } 600 } 601 } 602 } 603 }; 604 } 605 606 private boolean 607 checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 608 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 609 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 610 for (LoadQueueItem lqi : e.getValue()) { 611 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 612 count.increment(); 613 if (count.intValue() > maxFilesPerRegionPerFamily) { 614 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family " 615 + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key " 616 + Bytes.toStringBinary(e.getKey())); 617 return false; 618 } 619 } 620 } 621 return true; 622 } 623 624 /** 625 * @param table the table to load into 626 * @param pool the ExecutorService 627 * @param queue the queue for LoadQueueItem 628 * @param startEndKeys start and end keys 629 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 630 */ 631 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 632 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 633 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 634 // <region start key, LQI> need synchronized only within this scope of this 635 // phase because of the puts that happen in futures. 636 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 637 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 638 Set<String> missingHFiles = new HashSet<>(); 639 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 640 new Pair<>(regionGroups, missingHFiles); 641 642 // drain LQIs and figure out bulk load groups 643 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 644 while (!queue.isEmpty()) { 645 final LoadQueueItem item = queue.remove(); 646 647 final Callable<Pair<List<LoadQueueItem>, String>> call = 648 new Callable<Pair<List<LoadQueueItem>, String>>() { 649 @Override 650 public Pair<List<LoadQueueItem>, String> call() throws Exception { 651 Pair<List<LoadQueueItem>, String> splits = 652 groupOrSplit(regionGroups, item, table, startEndKeys); 653 return splits; 654 } 655 }; 656 splittingFutures.add(pool.submit(call)); 657 } 658 // get all the results. All grouping and splitting must finish before 659 // we can attempt the atomic loads. 660 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 661 try { 662 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 663 if (splits != null) { 664 if (splits.getFirst() != null) { 665 queue.addAll(splits.getFirst()); 666 } else { 667 missingHFiles.add(splits.getSecond()); 668 } 669 } 670 } catch (ExecutionException e1) { 671 Throwable t = e1.getCause(); 672 if (t instanceof IOException) { 673 LOG.error("IOException during splitting", e1); 674 throw (IOException) t; // would have been thrown if not parallelized, 675 } 676 LOG.error("Unexpected execution exception during splitting", e1); 677 throw new IllegalStateException(t); 678 } catch (InterruptedException e1) { 679 LOG.error("Unexpected interrupted exception during splitting", e1); 680 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 681 } 682 } 683 return pair; 684 } 685 686 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 687 byte[] startKey, byte[] splitKey) throws IOException { 688 Path hfilePath = item.getFilePath(); 689 byte[] family = item.getFamily(); 690 Path tmpDir = hfilePath.getParent(); 691 if (!tmpDir.getName().equals(TMP_DIR)) { 692 tmpDir = new Path(tmpDir, TMP_DIR); 693 } 694 695 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 696 697 String uniqueName = getUniqueName(); 698 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 699 700 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 701 Path topOut = new Path(tmpDir, uniqueName + ".top"); 702 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 703 704 FileSystem fs = tmpDir.getFileSystem(getConf()); 705 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 706 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 707 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 708 709 // Add these back at the *front* of the queue, so there's a lower 710 // chance that the region will just split again before we get there. 711 List<LoadQueueItem> lqis = new ArrayList<>(2); 712 lqis.add(new LoadQueueItem(family, botOut)); 713 lqis.add(new LoadQueueItem(family, topOut)); 714 715 // If the current item is already the result of previous splits, 716 // we don't need it anymore. Clean up to save space. 717 // It is not part of the original input files. 718 try { 719 if (tmpDir.getName().equals(TMP_DIR)) { 720 fs.delete(hfilePath, false); 721 } 722 } catch (IOException e) { 723 LOG.warn("Unable to delete temporary split file " + hfilePath); 724 } 725 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 726 return lqis; 727 } 728 729 /** 730 * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending 731 * order by start key 732 * @param key the key need to find which region belong to 733 * @return region index 734 */ 735 private int getRegionIndex(final Pair<byte[][], byte[][]> startEndKeys, byte[] key) { 736 int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, Bytes.BYTES_COMPARATOR); 737 if (idx < 0) { 738 // not on boundary, returns -(insertion index). Calculate region it 739 // would be in. 740 idx = -(idx + 1) - 1; 741 } 742 return idx; 743 } 744 745 /** 746 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first region 747 * info is lost. 2) if the endkey of a region is not equal to the startkey of the next region. 3) 748 * if the endkey of the last region is not empty. 749 */ 750 private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]> startEndKeys, 751 TableName tableName) throws IOException { 752 if (idx < 0) { 753 throw new IOException("The first region info for table " + tableName 754 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 755 } else if ( 756 (idx == startEndKeys.getFirst().length - 1) 757 && !Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY) 758 ) { 759 throw new IOException("The last region info for table " + tableName 760 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 761 } else if ( 762 idx + 1 < startEndKeys.getFirst().length 763 && !(Bytes.compareTo(startEndKeys.getSecond()[idx], startEndKeys.getFirst()[idx + 1]) == 0) 764 ) { 765 throw new IOException("The endkey of one region for table " + tableName 766 + " is not equal to the startkey of the next region in hbase:meta." 767 + "Please use hbck tool to fix it first."); 768 } 769 } 770 771 /** 772 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 773 * no longer fits into a region, physically splits the hfile such that the new bottom half will 774 * fit and returns the list of LQI's corresponding to the resultant hfiles. 775 * <p> 776 * protected for testing 777 * @throws IOException if an IO failure is encountered 778 */ 779 @InterfaceAudience.Private 780 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 781 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 782 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 783 Path hfilePath = item.getFilePath(); 784 Optional<byte[]> first, last; 785 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 786 CacheConfig.DISABLED, true, getConf())) { 787 first = hfr.getFirstRowKey(); 788 last = hfr.getLastRowKey(); 789 } catch (FileNotFoundException fnfe) { 790 LOG.debug("encountered", fnfe); 791 return new Pair<>(null, hfilePath.getName()); 792 } 793 794 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) 795 + " last=" + last.map(Bytes::toStringBinary)); 796 if (!first.isPresent() || !last.isPresent()) { 797 assert !first.isPresent() && !last.isPresent(); 798 // TODO what if this is due to a bad HFile? 799 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 800 return null; 801 } 802 if (Bytes.compareTo(first.get(), last.get()) > 0) { 803 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) 804 + " > " + Bytes.toStringBinary(last.get())); 805 } 806 807 int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get()); 808 checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, table.getName()); 809 boolean lastKeyInRange = 810 Bytes.compareTo(last.get(), startEndKeys.getSecond()[firstKeyRegionIdx]) < 0 811 || Bytes.equals(startEndKeys.getSecond()[firstKeyRegionIdx], HConstants.EMPTY_BYTE_ARRAY); 812 if (!lastKeyInRange) { 813 if (failIfNeedSplitHFile) { 814 throw new IOException( 815 "The key range of hfile=" + hfilePath + " fits into no region. " + "And because " 816 + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps."); 817 } 818 int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get()); 819 int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) >>> 1; 820 // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger 821 // than hfile.endkey w/o this check 822 if (splitIdx != firstKeyRegionIdx) { 823 checkRegionIndexValid(splitIdx, startEndKeys, table.getName()); 824 } 825 List<LoadQueueItem> lqis = splitStoreFile(item, table, 826 startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx]); 827 return new Pair<>(lqis, null); 828 } 829 830 // group regions. 831 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[firstKeyRegionIdx]), item); 832 return null; 833 } 834 835 /** 836 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 837 * hfiles that need to be retried. If it is successful it will return an empty list. 838 * <p> 839 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 840 * and fails atomically. 841 * <p> 842 * Protected for testing. 843 * @return empty list if success, list of items to retry on recoverable failure 844 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 845 */ 846 @Deprecated 847 @InterfaceAudience.Private 848 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, 849 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis, 850 boolean copyFile) throws IOException { 851 ClientServiceCallable<byte[]> serviceCallable = 852 buildClientServiceCallable(conn, tableName, first, lqis, copyFile); 853 return tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); 854 } 855 856 /** 857 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 858 * hfiles that need to be retried. If it is successful it will return an empty list. 859 * <p> 860 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 861 * and fails atomically. 862 * <p> 863 * Protected for testing. 864 * @return empty list if success, list of items to retry on recoverable failure 865 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 866 */ 867 @Deprecated 868 @InterfaceAudience.Private 869 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 870 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 871 throws IOException { 872 List<LoadQueueItem> toRetry = new ArrayList<>(); 873 try { 874 Configuration conf = getConf(); 875 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 876 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 877 if (region == null) { 878 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) 879 + " into table " + tableName + " with files " + lqis 880 + " failed. This is recoverable and they will be retried."); 881 toRetry.addAll(lqis); // return lqi's to retry 882 } 883 // success 884 return toRetry; 885 } catch (IOException e) { 886 LOG.error("Encountered unrecoverable error from region server, additional details: " 887 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 888 LOG.warn("Received a " + e.getClass().getSimpleName() + " from region server: " 889 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 890 if ( 891 getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 892 && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 893 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 894 ) { 895 LOG.warn( 896 "Will attempt to retry loading failed HFiles. Retry #" + numRetries.incrementAndGet()); 897 toRetry.addAll(lqis); 898 return toRetry; 899 } 900 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 901 throw e; 902 } 903 } 904 905 /** 906 * If the table is created for the first time, then "completebulkload" reads the files twice. More 907 * modifications necessary if we want to avoid doing it. 908 */ 909 private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { 910 final FileSystem fs = hfofDir.getFileSystem(getConf()); 911 912 // Add column families 913 // Build a set of keys 914 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 915 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 916 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 917 @Override 918 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 919 ColumnFamilyDescriptorBuilder builder = 920 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 921 familyBuilders.add(builder); 922 return builder; 923 } 924 925 @Override 926 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 927 throws IOException { 928 Path hfile = hfileStatus.getPath(); 929 try (HFile.Reader reader = 930 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 931 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 932 builder.setCompressionType(reader.getFileContext().getCompression()); 933 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() 934 + " for family " + builder.getNameAsString()); 935 } 936 byte[] first = reader.getFirstRowKey().get(); 937 byte[] last = reader.getLastRowKey().get(); 938 939 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" 940 + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 941 942 // To eventually infer start key-end key boundaries 943 Integer value = map.containsKey(first) ? map.get(first) : 0; 944 map.put(first, value + 1); 945 946 value = map.containsKey(last) ? map.get(last) : 0; 947 map.put(last, value - 1); 948 } 949 } 950 }); 951 952 byte[][] keys = inferBoundaries(map); 953 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 954 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 955 .forEachOrdered(tdBuilder::setColumnFamily); 956 admin.createTable(tdBuilder.build(), keys); 957 958 LOG.info("Table " + tableName + " is available!!"); 959 } 960 961 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 962 SecureBulkLoadClient secureClient) throws IOException { 963 fsDelegationToken.releaseDelegationToken(); 964 if (bulkToken != null && secureClient != null) { 965 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 966 } 967 if (pool != null) { 968 pool.shutdown(); 969 } 970 if (!queue.isEmpty()) { 971 StringBuilder err = new StringBuilder(); 972 err.append("-------------------------------------------------\n"); 973 err.append("Bulk load aborted with some files not yet loaded:\n"); 974 err.append("-------------------------------------------------\n"); 975 for (LoadQueueItem q : queue) { 976 err.append(" ").append(q.getFilePath()).append('\n'); 977 } 978 LOG.error(err.toString()); 979 } 980 } 981 982 // unique file name for the table 983 private String getUniqueName() { 984 return UUID.randomUUID().toString().replaceAll("-", ""); 985 } 986 987 /** 988 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 989 */ 990 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 991 throws IOException { 992 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 993 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 994 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 995 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 996 if (unmatchedFamilies.size() > 0) { 997 String msg = 998 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " 999 + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " 1000 + familyNames; 1001 LOG.error(msg); 1002 if (!silence) { 1003 throw new IOException(msg); 1004 } 1005 } 1006 } 1007 1008 /** 1009 * Populate the Queue with given HFiles 1010 */ 1011 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 1012 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 1013 } 1014 1015 /** 1016 * Walk the given directory for all HFiles, and return a Queue containing all such files. 1017 */ 1018 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 1019 final boolean validateHFile) throws IOException { 1020 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 1021 @Override 1022 public byte[] bulkFamily(final byte[] familyName) { 1023 return familyName; 1024 } 1025 1026 @Override 1027 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 1028 long length = hfile.getLen(); 1029 if ( 1030 length 1031 > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 1032 ) { 1033 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length 1034 + " bytes can be problematic as it may lead to oversplitting."); 1035 } 1036 ret.add(new LoadQueueItem(family, hfile.getPath())); 1037 } 1038 }, validateHFile); 1039 } 1040 1041 private interface BulkHFileVisitor<TFamily> { 1042 1043 TFamily bulkFamily(byte[] familyName) throws IOException; 1044 1045 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 1046 } 1047 1048 /** 1049 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 1050 * non-valid hfiles. 1051 */ 1052 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 1053 final BulkHFileVisitor<TFamily> visitor) throws IOException { 1054 visitBulkHFiles(fs, bulkDir, visitor, true); 1055 } 1056 1057 /** 1058 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 1059 * skip non-valid hfiles by default, or skip this validation by setting 1060 * 'hbase.loadincremental.validate.hfile' to false. 1061 */ 1062 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 1063 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 1064 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 1065 for (FileStatus familyStat : familyDirStatuses) { 1066 if (!familyStat.isDirectory()) { 1067 LOG.warn("Skipping non-directory " + familyStat.getPath()); 1068 continue; 1069 } 1070 Path familyDir = familyStat.getPath(); 1071 byte[] familyName = Bytes.toBytes(familyDir.getName()); 1072 // Skip invalid family 1073 try { 1074 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 1075 } catch (IllegalArgumentException e) { 1076 LOG.warn("Skipping invalid " + familyStat.getPath()); 1077 continue; 1078 } 1079 TFamily family = visitor.bulkFamily(familyName); 1080 1081 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1082 for (FileStatus hfileStatus : hfileStatuses) { 1083 if (!fs.isFile(hfileStatus.getPath())) { 1084 LOG.warn("Skipping non-file " + hfileStatus); 1085 continue; 1086 } 1087 1088 Path hfile = hfileStatus.getPath(); 1089 // Skip "_", reference, HFileLink 1090 String fileName = hfile.getName(); 1091 if (fileName.startsWith("_")) { 1092 continue; 1093 } 1094 if (StoreFileInfo.isReference(fileName)) { 1095 LOG.warn("Skipping reference " + fileName); 1096 continue; 1097 } 1098 if (HFileLink.isHFileLink(fileName)) { 1099 LOG.warn("Skipping HFileLink " + fileName); 1100 continue; 1101 } 1102 1103 // Validate HFile Format if needed 1104 if (validateHFile) { 1105 try { 1106 if (!HFile.isHFileFormat(fs, hfile)) { 1107 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1108 continue; 1109 } 1110 } catch (FileNotFoundException e) { 1111 LOG.warn("the file " + hfile + " was removed"); 1112 continue; 1113 } 1114 } 1115 1116 visitor.bulkHFile(family, hfileStatus); 1117 } 1118 } 1119 } 1120 1121 // Initialize a thread pool 1122 private ExecutorService createExecutorService() { 1123 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1124 new LinkedBlockingQueue<>(), 1125 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1126 pool.allowCoreThreadTimeOut(true); 1127 return pool; 1128 } 1129 1130 private final String toString(List<Pair<byte[], String>> list) { 1131 StringBuilder sb = new StringBuilder(); 1132 sb.append('['); 1133 list.forEach(p -> { 1134 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1135 .append('}'); 1136 }); 1137 sb.append(']'); 1138 return sb.toString(); 1139 } 1140 1141 private boolean isSecureBulkLoadEndpointAvailable() { 1142 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1143 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1144 } 1145 1146 /** 1147 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1148 * filters, etc. 1149 */ 1150 @InterfaceAudience.Private 1151 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1152 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1153 // Open reader with no block cache, and not in-memory 1154 Reference topReference = Reference.createTopReference(splitKey); 1155 Reference bottomReference = Reference.createBottomReference(splitKey); 1156 1157 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1158 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1159 } 1160 1161 /** 1162 * Copy half of an HFile into a new HFile. 1163 */ 1164 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1165 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1166 FileSystem fs = inFile.getFileSystem(conf); 1167 CacheConfig cacheConf = CacheConfig.DISABLED; 1168 HalfStoreFileReader halfReader = null; 1169 StoreFileWriter halfWriter = null; 1170 try { 1171 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 1172 StoreFileInfo storeFileInfo = 1173 new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); 1174 storeFileInfo.initHFileInfo(context); 1175 halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf); 1176 storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); 1177 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1178 1179 int blocksize = familyDescriptor.getBlocksize(); 1180 Algorithm compression = familyDescriptor.getCompressionType(); 1181 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1182 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1183 .withChecksumType(StoreUtils.getChecksumType(conf)) 1184 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1185 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1186 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 1187 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1188 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1189 HFileScanner scanner = halfReader.getScanner(false, false, false); 1190 scanner.seekTo(); 1191 do { 1192 halfWriter.append(scanner.getCell()); 1193 } while (scanner.next()); 1194 1195 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1196 if (shouldCopyHFileMetaKey(entry.getKey())) { 1197 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1198 } 1199 } 1200 } finally { 1201 if (halfReader != null) { 1202 try { 1203 halfReader.close(cacheConf.shouldEvictOnClose()); 1204 } catch (IOException e) { 1205 LOG.warn("failed to close hfile reader for " + inFile, e); 1206 } 1207 } 1208 if (halfWriter != null) { 1209 halfWriter.close(); 1210 } 1211 1212 } 1213 } 1214 1215 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1216 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1217 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1218 return false; 1219 } 1220 1221 return !HFileInfo.isReservedFileInfoKey(key); 1222 } 1223 1224 private boolean isCreateTable() { 1225 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1226 } 1227 1228 private boolean isSilence() { 1229 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1230 } 1231 1232 private boolean isAlwaysCopyFiles() { 1233 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1234 } 1235 1236 protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName) 1237 throws IOException { 1238 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1239 Admin admin = connection.getAdmin()) { 1240 if (!admin.tableExists(tableName)) { 1241 if (isCreateTable()) { 1242 createTable(tableName, hfofDir, admin); 1243 } else { 1244 String errorMsg = format("Table '%s' does not exist.", tableName); 1245 LOG.error(errorMsg); 1246 throw new TableNotFoundException(errorMsg); 1247 } 1248 } 1249 try (Table table = connection.getTable(tableName); 1250 RegionLocator locator = connection.getRegionLocator(tableName)) { 1251 return doBulkLoad(hfofDir, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1252 } 1253 } 1254 } 1255 1256 /** 1257 * Perform bulk load on the given table. 1258 * @param hfofDir the directory that was provided as the output path of a job using 1259 * HFileOutputFormat 1260 * @param tableName the table to load into 1261 */ 1262 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1263 throws IOException { 1264 return run(new Path(hfofDir), tableName); 1265 } 1266 1267 /** 1268 * Perform bulk load on the given table. 1269 * @param family2Files map of family to List of hfiles 1270 * @param tableName the table to load into 1271 */ 1272 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1273 TableName tableName) throws IOException { 1274 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1275 Admin admin = connection.getAdmin()) { 1276 if (!admin.tableExists(tableName)) { 1277 String errorMsg = format("Table '%s' does not exist.", tableName); 1278 LOG.error(errorMsg); 1279 throw new TableNotFoundException(errorMsg); 1280 } 1281 try (Table table = connection.getTable(tableName); 1282 RegionLocator locator = connection.getRegionLocator(tableName)) { 1283 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1284 } 1285 } 1286 } 1287 1288 @Override 1289 public int run(String[] args) throws Exception { 1290 if (args.length != 2 && args.length != 3) { 1291 usage(); 1292 return -1; 1293 } 1294 // Re-initialize to apply -D options from the command line parameters 1295 initialize(); 1296 String dirPath = args[0]; 1297 TableName tableName = TableName.valueOf(args[1]); 1298 if (args.length == 2) { 1299 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1300 } else { 1301 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1302 FileSystem fs = FileSystem.get(getConf()); 1303 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1304 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1305 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1306 byte[] familyName = Bytes.toBytes(family); 1307 if (family2Files.containsKey(familyName)) { 1308 family2Files.get(familyName).add(path); 1309 } else { 1310 family2Files.put(familyName, Lists.newArrayList(path)); 1311 } 1312 }); 1313 } 1314 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1315 } 1316 1317 } 1318 1319 public static void main(String[] args) throws Exception { 1320 Configuration conf = HBaseConfiguration.create(); 1321 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1322 System.exit(ret); 1323 } 1324 1325 /** 1326 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1327 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1328 * property. This directory is used as a temporary directory where all files are initially 1329 * copied/moved from user given directory, set all the required file permissions and then from 1330 * their it is finally loaded into a table. This should be set only when, one would like to manage 1331 * the staging directory by itself. Otherwise this tool will handle this by itself. 1332 * @param stagingDir staging directory path 1333 */ 1334 public void setBulkToken(String stagingDir) { 1335 this.bulkToken = stagingDir; 1336 } 1337 1338 public void setClusterIds(List<String> clusterIds) { 1339 this.clusterIds = clusterIds; 1340 } 1341 1342 /** 1343 * Disables replication for these bulkloaded files. 1344 */ 1345 public void disableReplication() { 1346 this.replicate = false; 1347 } 1348 1349 /** 1350 * Infers region boundaries for a new table. 1351 * <p> 1352 * Parameter: <br> 1353 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1354 * <ul> 1355 * <li>If a key is a start key of a file, then it maps to +1</li> 1356 * <li>If a key is an end key of a file, then it maps to -1</li> 1357 * </ul> 1358 * <p> 1359 * Algo:<br> 1360 * <ol> 1361 * <li>Poll on the keys in order: 1362 * <ol type="a"> 1363 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1364 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1365 * boundary list.</li> 1366 * </ol> 1367 * </li> 1368 * <li>Return the boundary list.</li> 1369 * </ol> 1370 */ 1371 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1372 List<byte[]> keysArray = new ArrayList<>(); 1373 int runningValue = 0; 1374 byte[] currStartKey = null; 1375 boolean firstBoundary = true; 1376 1377 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1378 if (runningValue == 0) { 1379 currStartKey = item.getKey(); 1380 } 1381 runningValue += item.getValue(); 1382 if (runningValue == 0) { 1383 if (!firstBoundary) { 1384 keysArray.add(currStartKey); 1385 } 1386 firstBoundary = false; 1387 } 1388 } 1389 1390 return keysArray.toArray(new byte[0][]); 1391 } 1392}