001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static java.lang.String.format;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Deque;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Optional;
038import java.util.Set;
039import java.util.SortedMap;
040import java.util.TreeMap;
041import java.util.UUID;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Future;
046import java.util.concurrent.LinkedBlockingQueue;
047import java.util.concurrent.ThreadPoolExecutor;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.stream.Collectors;
051import org.apache.commons.lang3.mutable.MutableInt;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.conf.Configured;
054import org.apache.hadoop.fs.FileStatus;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.TableNotFoundException;
062import org.apache.hadoop.hbase.client.Admin;
063import org.apache.hadoop.hbase.client.ClientServiceCallable;
064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
066import org.apache.hadoop.hbase.client.Connection;
067import org.apache.hadoop.hbase.client.ConnectionFactory;
068import org.apache.hadoop.hbase.client.RegionLocator;
069import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
070import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
071import org.apache.hadoop.hbase.client.Table;
072import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
073import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
074import org.apache.hadoop.hbase.io.HFileLink;
075import org.apache.hadoop.hbase.io.HalfStoreFileReader;
076import org.apache.hadoop.hbase.io.Reference;
077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
078import org.apache.hadoop.hbase.io.hfile.CacheConfig;
079import org.apache.hadoop.hbase.io.hfile.HFile;
080import org.apache.hadoop.hbase.io.hfile.HFileContext;
081import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
083import org.apache.hadoop.hbase.io.hfile.HFileInfo;
084import org.apache.hadoop.hbase.io.hfile.HFileScanner;
085import org.apache.hadoop.hbase.io.hfile.ReaderContext;
086import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
087import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
088import org.apache.hadoop.hbase.regionserver.BloomType;
089import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
090import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
091import org.apache.hadoop.hbase.regionserver.StoreUtils;
092import org.apache.hadoop.hbase.security.UserProvider;
093import org.apache.hadoop.hbase.security.token.FsDelegationToken;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.FSUtils;
097import org.apache.hadoop.hbase.util.FSVisitor;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.util.Tool;
100import org.apache.hadoop.util.ToolRunner;
101import org.apache.yetus.audience.InterfaceAudience;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
106import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
107import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
108import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
109import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
111
112/**
113 * Tool to load the output of HFileOutputFormat into an existing table.
114 * <p/>
115 * Notice that, by default, this class should be kept till 4.0.0, but as this is a bad practice that
116 * we expose an implementation class instead of an interface, we want to fix it ASAP. That's why we
117 * will remove this class completely in 3.0.0. Please change your code to use
118 * {@link BulkLoadHFiles}.
119 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
120 *             rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
121 *             and {@link #run(String, TableName)}, as all the methods other than them will be
122 *             removed with no replacement.
123 */
124@Deprecated
125@InterfaceAudience.Public
126public class LoadIncrementalHFiles extends Configured implements Tool {
127
128  private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
129
130  /**
131   * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
132   *             depend on this value.
133   */
134  @Deprecated
135  public static final String NAME = BulkLoadHFilesTool.NAME;
136  static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
137  public static final String MAX_FILES_PER_REGION_PER_FAMILY =
138    BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
139  private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
140  public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
141  public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
142    BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
143  public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
144
145  public static final String FAIL_IF_NEED_SPLIT_HFILE =
146    "hbase.loadincremental.fail.if.need.split.hfile";
147
148  // We use a '.' prefix which is ignored when walking directory trees
149  // above. It is invalid family name.
150  static final String TMP_DIR = ".tmp";
151
152  private int maxFilesPerRegionPerFamily;
153  private boolean assignSeqIds;
154  private boolean bulkLoadByFamily;
155
156  // Source delegation token
157  private FsDelegationToken fsDelegationToken;
158  private UserProvider userProvider;
159  private int nrThreads;
160  private AtomicInteger numRetries;
161  private RpcControllerFactory rpcControllerFactory;
162
163  private String bulkToken;
164
165  private List<String> clusterIds = new ArrayList<>();
166
167  private boolean replicate = true;
168
169  private boolean failIfNeedSplitHFile = false;
170
171  /**
172   * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
173   * the case where a region has split during the process of the load. When this happens, the HFile
174   * is split into two physical parts across the new region boundary, and each part is added back
175   * into the queue. The import process finishes when the queue is empty.
176   * @deprecated since 2.2.0 and will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead.
177   * @see BulkLoadHFiles
178   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a>
179   */
180  @InterfaceAudience.Public
181  @Deprecated
182  public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
183
184    public LoadQueueItem(byte[] family, Path hfilePath) {
185      super(family, hfilePath);
186    }
187  }
188
189  public LoadIncrementalHFiles(Configuration conf) {
190    // make a copy, just to be sure we're not overriding someone else's config
191    super(HBaseConfiguration.create(conf));
192    initialize();
193  }
194
195  public void initialize() {
196    Configuration conf = getConf();
197    // disable blockcache for tool invocation, see HBASE-10500
198    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
199    userProvider = UserProvider.instantiate(conf);
200    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
201    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
202    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
203    bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false);
204    failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false);
205    nrThreads =
206      conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
207    numRetries = new AtomicInteger(0);
208    rpcControllerFactory = new RpcControllerFactory(conf);
209  }
210
211  private void usage() {
212    System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
213      + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
214      + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- "
215      + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D"
216      + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target "
217      + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY
218      + "=yes to ignore unmatched column families.\n"
219      + " -loadTable for when directory of files to load has a depth of 3; target table must "
220      + "exist;\n" + " must be last of the options on command line.\n"
221      + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for "
222      + "documentation.\n");
223  }
224
225  /**
226   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
227   * passed directory and validates whether the prepared queue has all the valid table column
228   * families in it.
229   * @param hfilesDir     directory containing list of hfiles to be loaded into the table
230   * @param table         table to which hfiles should be loaded
231   * @param queue         queue which needs to be loaded into the table
232   * @param validateHFile if true hfiles will be validated for its format
233   * @throws IOException If any I/O or network error occurred
234   */
235  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
236    boolean validateHFile) throws IOException {
237    prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
238  }
239
240  /**
241   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
242   * passed directory and validates whether the prepared queue has all the valid table column
243   * families in it.
244   * @param hfilesDir     directory containing list of hfiles to be loaded into the table
245   * @param table         table to which hfiles should be loaded
246   * @param queue         queue which needs to be loaded into the table
247   * @param validateHFile if true hfiles will be validated for its format
248   * @param silence       true to ignore unmatched column families
249   * @throws IOException If any I/O or network error occurred
250   */
251  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
252    boolean validateHFile, boolean silence) throws IOException {
253    discoverLoadQueue(queue, hfilesDir, validateHFile);
254    validateFamiliesInHFiles(table, queue, silence);
255  }
256
257  /**
258   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
259   * passed directory and validates whether the prepared queue has all the valid table column
260   * families in it.
261   * @param map     map of family to List of hfiles
262   * @param table   table to which hfiles should be loaded
263   * @param queue   queue which needs to be loaded into the table
264   * @param silence true to ignore unmatched column families
265   * @throws IOException If any I/O or network error occurred
266   */
267  public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
268    Deque<LoadQueueItem> queue, boolean silence) throws IOException {
269    populateLoadQueue(queue, map);
270    validateFamiliesInHFiles(table, queue, silence);
271  }
272
273  /**
274   * Perform a bulk load of the given directory into the given pre-existing table. This method is
275   * not threadsafe.
276   * @param hfofDir       the directory that was provided as the output path of a job using
277   *                      HFileOutputFormat
278   * @param admin         the Admin
279   * @param table         the table to load into
280   * @param regionLocator region locator
281   * @throws TableNotFoundException if table does not yet exist
282   */
283  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
284    RegionLocator regionLocator) throws TableNotFoundException, IOException {
285    return doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
286  }
287
288  /**
289   * Perform a bulk load of the given directory into the given pre-existing table. This method is
290   * not threadsafe.
291   * @param map           map of family to List of hfiles
292   * @param admin         the Admin
293   * @param table         the table to load into
294   * @param regionLocator region locator
295   * @param silence       true to ignore unmatched column families
296   * @param copyFile      always copy hfiles if true
297   * @throws TableNotFoundException if table does not yet exist
298   */
299  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
300    Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
301    throws TableNotFoundException, IOException {
302    if (!admin.isTableAvailable(regionLocator.getName())) {
303      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
304    }
305    // LQI queue does not need to be threadsafe -- all operations on this queue
306    // happen in this thread
307    Deque<LoadQueueItem> queue = new ArrayDeque<>();
308    ExecutorService pool = null;
309    SecureBulkLoadClient secureClient = null;
310    try {
311      prepareHFileQueue(map, table, queue, silence);
312      if (queue.isEmpty()) {
313        LOG.warn("Bulk load operation did not get any files to load");
314        return Collections.emptyMap();
315      }
316      pool = createExecutorService();
317      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
318      return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
319    } finally {
320      cleanup(admin, queue, pool, secureClient);
321    }
322  }
323
324  /**
325   * Perform a bulk load of the given directory into the given pre-existing table. This method is
326   * not threadsafe.
327   * @param hfofDir       the directory that was provided as the output path of a job using
328   *                      HFileOutputFormat
329   * @param admin         the Admin
330   * @param table         the table to load into
331   * @param regionLocator region locator
332   * @param silence       true to ignore unmatched column families
333   * @param copyFile      always copy hfiles if true
334   * @throws TableNotFoundException if table does not yet exist
335   */
336  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
337    RegionLocator regionLocator, boolean silence, boolean copyFile)
338    throws TableNotFoundException, IOException {
339    if (!admin.isTableAvailable(regionLocator.getName())) {
340      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
341    }
342
343    /*
344     * Checking hfile format is a time-consuming operation, we should have an option to skip this
345     * step when bulkloading millions of HFiles. See HBASE-13985.
346     */
347    boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
348    if (!validateHFile) {
349      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files "
350        + "are not correct. If you fail to read data from your table after using this "
351        + "option, consider removing the files and bulkload again without this option. "
352        + "See HBASE-13985");
353    }
354    // LQI queue does not need to be threadsafe -- all operations on this queue
355    // happen in this thread
356    Deque<LoadQueueItem> queue = new ArrayDeque<>();
357    ExecutorService pool = null;
358    SecureBulkLoadClient secureClient = null;
359    try {
360      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
361
362      if (queue.isEmpty()) {
363        LOG.warn(
364          "Bulk load operation did not find any files to load in directory {}. "
365            + "Does it contain files in subdirectories that correspond to column family names?",
366          (hfofDir != null ? hfofDir.toUri().toString() : ""));
367        return Collections.emptyMap();
368      }
369      pool = createExecutorService();
370      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
371      return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
372    } finally {
373      cleanup(admin, queue, pool, secureClient);
374    }
375  }
376
377  /**
378   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
379   * <ol>
380   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
381   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
382   * </li>
383   * </ol>
384   * @param table        Table to which these hfiles should be loaded to
385   * @param conn         Connection to use
386   * @param queue        {@link LoadQueueItem} has hfiles yet to be loaded
387   * @param startEndKeys starting and ending row keys of the region
388   */
389  public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
390    Pair<byte[][], byte[][]> startEndKeys) throws IOException {
391    loadHFileQueue(table, conn, queue, startEndKeys, false);
392  }
393
394  /**
395   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
396   * <ol>
397   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
398   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
399   * </li>
400   * </ol>
401   * @param table        Table to which these hfiles should be loaded to
402   * @param conn         Connection to use
403   * @param queue        {@link LoadQueueItem} has hfiles yet to be loaded
404   * @param startEndKeys starting and ending row keys of the region
405   */
406  public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
407    Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
408    ExecutorService pool = null;
409    try {
410      pool = createExecutorService();
411      Multimap<ByteBuffer, LoadQueueItem> regionGroups =
412        groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
413      bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
414    } finally {
415      if (pool != null) {
416        pool.shutdown();
417      }
418    }
419  }
420
421  private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
422    RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
423    SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
424    int count = 0;
425
426    if (isSecureBulkLoadEndpointAvailable()) {
427      LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
428      LOG.warn("Secure bulk load has been integrated into HBase core.");
429    }
430
431    fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
432    bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
433    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
434
435    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
436    // Assumes that region splits can happen while this occurs.
437    while (!queue.isEmpty()) {
438      // need to reload split keys each iteration.
439      final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
440      if (count != 0) {
441        LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with "
442          + queue.size() + " files remaining to group or split");
443      }
444
445      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
446      maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
447      if (maxRetries != 0 && count >= maxRetries) {
448        throw new IOException(
449          "Retry attempted " + count + " times without completing, bailing out");
450      }
451      count++;
452
453      // Using ByteBuffer for byte[] equality semantics
454      pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
455      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
456
457      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
458        // Error is logged inside checkHFilesCountPerRegionPerFamily.
459        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
460          + " hfiles to one family of one region");
461      }
462
463      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
464        item2RegionMap);
465
466      // NOTE: The next iteration's split / group could happen in parallel to
467      // atomic bulkloads assuming that there are splits and no merges, and
468      // that we can atomically pull out the groups we want to retry.
469    }
470
471    if (!queue.isEmpty()) {
472      throw new RuntimeException(
473        "Bulk load aborted with some files not yet loaded." + "Please check log for more details.");
474    }
475    return item2RegionMap;
476  }
477
478  private Map<byte[], Collection<LoadQueueItem>>
479    groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
480    Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
481    itemsInRegion.forEach(item -> families2Queue
482      .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
483    return families2Queue;
484  }
485
486  /**
487   * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
488   * re-queued for another pass with the groupOrSplitPhase.
489   * <p>
490   * protected for testing.
491   */
492  @InterfaceAudience.Private
493  protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
494    Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
495    Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
496    // atomically bulk load the groups.
497    Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
498    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap()
499      .entrySet()) {
500      byte[] first = e.getKey().array();
501      Collection<LoadQueueItem> lqis = e.getValue();
502      if (item2RegionMap != null) {
503        for (LoadQueueItem lqi : lqis) {
504          item2RegionMap.put(lqi, e.getKey());
505        }
506      }
507      if (bulkLoadByFamily) {
508        groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures.add(pool
509          .submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, familyQueue, copyFile))));
510      } else {
511        loadingFutures.add(
512          pool.submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile)));
513      }
514    }
515
516    // get all the results.
517    for (Future<List<LoadQueueItem>> future : loadingFutures) {
518      try {
519        List<LoadQueueItem> toRetry = future.get();
520
521        if (item2RegionMap != null) {
522          for (LoadQueueItem lqi : toRetry) {
523            item2RegionMap.remove(lqi);
524          }
525        }
526        // LQIs that are requeued to be regrouped.
527        queue.addAll(toRetry);
528
529      } catch (ExecutionException e1) {
530        Throwable t = e1.getCause();
531        if (t instanceof IOException) {
532          // At this point something unrecoverable has happened.
533          // TODO Implement bulk load recovery
534          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
535        }
536        LOG.error("Unexpected execution exception during bulk load", e1);
537        throw new IllegalStateException(t);
538      } catch (InterruptedException e1) {
539        LOG.error("Unexpected interrupted exception during bulk load", e1);
540        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
541      }
542    }
543  }
544
545  @InterfaceAudience.Private
546  protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
547    TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
548    List<Pair<byte[], String>> famPaths =
549      lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
550        .collect(Collectors.toList());
551    return new ClientServiceCallable<byte[]>(conn, tableName, first,
552      rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, Collections.emptyMap()) {
553      @Override
554      protected byte[] rpcCall() throws Exception {
555        SecureBulkLoadClient secureClient = null;
556        boolean success = false;
557        try {
558          if (LOG.isDebugEnabled()) {
559            LOG.debug("Going to connect to server " + getLocation() + " for row "
560              + Bytes.toStringBinary(getRow()) + " with hfile group "
561              + LoadIncrementalHFiles.this.toString(famPaths));
562          }
563          byte[] regionName = getLocation().getRegionInfo().getRegionName();
564          try (Table table = conn.getTable(getTableName())) {
565            secureClient = new SecureBulkLoadClient(getConf(), table);
566            success =
567              secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds,
568                fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds, replicate);
569          }
570          return success ? regionName : null;
571        } finally {
572          // Best effort copying of files that might not have been imported
573          // from the staging directory back to original location
574          // in user directory
575          if (secureClient != null && !success) {
576            FileSystem targetFs = FileSystem.get(getConf());
577            FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf());
578            // Check to see if the source and target filesystems are the same
579            // If they are the same filesystem, we will try move the files back
580            // because previously we moved them to the staging directory.
581            if (FSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) {
582              for (Pair<byte[], String> el : famPaths) {
583                Path hfileStagingPath = null;
584                Path hfileOrigPath = new Path(el.getSecond());
585                try {
586                  hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
587                    hfileOrigPath.getName());
588                  if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
589                    LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath);
590                  } else if (targetFs.exists(hfileStagingPath)) {
591                    LOG.debug(
592                      "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath);
593                  }
594                } catch (Exception ex) {
595                  LOG.debug(
596                    "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex);
597                }
598              }
599            }
600          }
601        }
602      }
603    };
604  }
605
606  private boolean
607    checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
608    for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
609      Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
610      for (LoadQueueItem lqi : e.getValue()) {
611        MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
612        count.increment();
613        if (count.intValue() > maxFilesPerRegionPerFamily) {
614          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family "
615            + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key "
616            + Bytes.toStringBinary(e.getKey()));
617          return false;
618        }
619      }
620    }
621    return true;
622  }
623
624  /**
625   * @param table        the table to load into
626   * @param pool         the ExecutorService
627   * @param queue        the queue for LoadQueueItem
628   * @param startEndKeys start and end keys
629   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
630   */
631  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
632    final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
633    final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
634    // <region start key, LQI> need synchronized only within this scope of this
635    // phase because of the puts that happen in futures.
636    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
637    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
638    Set<String> missingHFiles = new HashSet<>();
639    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
640      new Pair<>(regionGroups, missingHFiles);
641
642    // drain LQIs and figure out bulk load groups
643    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
644    while (!queue.isEmpty()) {
645      final LoadQueueItem item = queue.remove();
646
647      final Callable<Pair<List<LoadQueueItem>, String>> call =
648        new Callable<Pair<List<LoadQueueItem>, String>>() {
649          @Override
650          public Pair<List<LoadQueueItem>, String> call() throws Exception {
651            Pair<List<LoadQueueItem>, String> splits =
652              groupOrSplit(regionGroups, item, table, startEndKeys);
653            return splits;
654          }
655        };
656      splittingFutures.add(pool.submit(call));
657    }
658    // get all the results. All grouping and splitting must finish before
659    // we can attempt the atomic loads.
660    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
661      try {
662        Pair<List<LoadQueueItem>, String> splits = lqis.get();
663        if (splits != null) {
664          if (splits.getFirst() != null) {
665            queue.addAll(splits.getFirst());
666          } else {
667            missingHFiles.add(splits.getSecond());
668          }
669        }
670      } catch (ExecutionException e1) {
671        Throwable t = e1.getCause();
672        if (t instanceof IOException) {
673          LOG.error("IOException during splitting", e1);
674          throw (IOException) t; // would have been thrown if not parallelized,
675        }
676        LOG.error("Unexpected execution exception during splitting", e1);
677        throw new IllegalStateException(t);
678      } catch (InterruptedException e1) {
679        LOG.error("Unexpected interrupted exception during splitting", e1);
680        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
681      }
682    }
683    return pair;
684  }
685
686  private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
687    byte[] startKey, byte[] splitKey) throws IOException {
688    Path hfilePath = item.getFilePath();
689    byte[] family = item.getFamily();
690    Path tmpDir = hfilePath.getParent();
691    if (!tmpDir.getName().equals(TMP_DIR)) {
692      tmpDir = new Path(tmpDir, TMP_DIR);
693    }
694
695    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
696
697    String uniqueName = getUniqueName();
698    ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family);
699
700    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
701    Path topOut = new Path(tmpDir, uniqueName + ".top");
702    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
703
704    FileSystem fs = tmpDir.getFileSystem(getConf());
705    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
706    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
707    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
708
709    // Add these back at the *front* of the queue, so there's a lower
710    // chance that the region will just split again before we get there.
711    List<LoadQueueItem> lqis = new ArrayList<>(2);
712    lqis.add(new LoadQueueItem(family, botOut));
713    lqis.add(new LoadQueueItem(family, topOut));
714
715    // If the current item is already the result of previous splits,
716    // we don't need it anymore. Clean up to save space.
717    // It is not part of the original input files.
718    try {
719      if (tmpDir.getName().equals(TMP_DIR)) {
720        fs.delete(hfilePath, false);
721      }
722    } catch (IOException e) {
723      LOG.warn("Unable to delete temporary split file " + hfilePath);
724    }
725    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
726    return lqis;
727  }
728
729  /**
730   * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
731   *                     order by start key
732   * @param key          the key need to find which region belong to
733   * @return region index
734   */
735  private int getRegionIndex(final Pair<byte[][], byte[][]> startEndKeys, byte[] key) {
736    int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, Bytes.BYTES_COMPARATOR);
737    if (idx < 0) {
738      // not on boundary, returns -(insertion index). Calculate region it
739      // would be in.
740      idx = -(idx + 1) - 1;
741    }
742    return idx;
743  }
744
745  /**
746   * we can consider there is a region hole in following conditions. 1) if idx < 0,then first region
747   * info is lost. 2) if the endkey of a region is not equal to the startkey of the next region. 3)
748   * if the endkey of the last region is not empty.
749   */
750  private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]> startEndKeys,
751    TableName tableName) throws IOException {
752    if (idx < 0) {
753      throw new IOException("The first region info for table " + tableName
754        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
755    } else if (
756      (idx == startEndKeys.getFirst().length - 1)
757        && !Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY)
758    ) {
759      throw new IOException("The last region info for table " + tableName
760        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
761    } else if (
762      idx + 1 < startEndKeys.getFirst().length
763        && !(Bytes.compareTo(startEndKeys.getSecond()[idx], startEndKeys.getFirst()[idx + 1]) == 0)
764    ) {
765      throw new IOException("The endkey of one region for table " + tableName
766        + " is not equal to the startkey of the next region in hbase:meta."
767        + "Please use hbck tool to fix it first.");
768    }
769  }
770
771  /**
772   * Attempt to assign the given load queue item into its target region group. If the hfile boundary
773   * no longer fits into a region, physically splits the hfile such that the new bottom half will
774   * fit and returns the list of LQI's corresponding to the resultant hfiles.
775   * <p>
776   * protected for testing
777   * @throws IOException if an IO failure is encountered
778   */
779  @InterfaceAudience.Private
780  protected Pair<List<LoadQueueItem>, String> groupOrSplit(
781    Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
782    final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
783    Path hfilePath = item.getFilePath();
784    Optional<byte[]> first, last;
785    try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
786      CacheConfig.DISABLED, true, getConf())) {
787      first = hfr.getFirstRowKey();
788      last = hfr.getLastRowKey();
789    } catch (FileNotFoundException fnfe) {
790      LOG.debug("encountered", fnfe);
791      return new Pair<>(null, hfilePath.getName());
792    }
793
794    LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary)
795      + " last=" + last.map(Bytes::toStringBinary));
796    if (!first.isPresent() || !last.isPresent()) {
797      assert !first.isPresent() && !last.isPresent();
798      // TODO what if this is due to a bad HFile?
799      LOG.info("hfile " + hfilePath + " has no entries, skipping");
800      return null;
801    }
802    if (Bytes.compareTo(first.get(), last.get()) > 0) {
803      throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
804        + " > " + Bytes.toStringBinary(last.get()));
805    }
806
807    int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
808    checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, table.getName());
809    boolean lastKeyInRange =
810      Bytes.compareTo(last.get(), startEndKeys.getSecond()[firstKeyRegionIdx]) < 0
811        || Bytes.equals(startEndKeys.getSecond()[firstKeyRegionIdx], HConstants.EMPTY_BYTE_ARRAY);
812    if (!lastKeyInRange) {
813      if (failIfNeedSplitHFile) {
814        throw new IOException(
815          "The key range of hfile=" + hfilePath + " fits into no region. " + "And because "
816            + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps.");
817      }
818      int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
819      int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) >>> 1;
820      // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
821      // than hfile.endkey w/o this check
822      if (splitIdx != firstKeyRegionIdx) {
823        checkRegionIndexValid(splitIdx, startEndKeys, table.getName());
824      }
825      List<LoadQueueItem> lqis = splitStoreFile(item, table,
826        startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx]);
827      return new Pair<>(lqis, null);
828    }
829
830    // group regions.
831    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[firstKeyRegionIdx]), item);
832    return null;
833  }
834
835  /**
836   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
837   * hfiles that need to be retried. If it is successful it will return an empty list.
838   * <p>
839   * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
840   * and fails atomically.
841   * <p>
842   * Protected for testing.
843   * @return empty list if success, list of items to retry on recoverable failure
844   * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead.
845   */
846  @Deprecated
847  @InterfaceAudience.Private
848  protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
849    final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
850    boolean copyFile) throws IOException {
851    ClientServiceCallable<byte[]> serviceCallable =
852      buildClientServiceCallable(conn, tableName, first, lqis, copyFile);
853    return tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
854  }
855
856  /**
857   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
858   * hfiles that need to be retried. If it is successful it will return an empty list.
859   * <p>
860   * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
861   * and fails atomically.
862   * <p>
863   * Protected for testing.
864   * @return empty list if success, list of items to retry on recoverable failure
865   * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead.
866   */
867  @Deprecated
868  @InterfaceAudience.Private
869  protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
870    final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
871    throws IOException {
872    List<LoadQueueItem> toRetry = new ArrayList<>();
873    try {
874      Configuration conf = getConf();
875      byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
876        .callWithRetries(serviceCallable, Integer.MAX_VALUE);
877      if (region == null) {
878        LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
879          + " into table " + tableName + " with files " + lqis
880          + " failed.  This is recoverable and they will be retried.");
881        toRetry.addAll(lqis); // return lqi's to retry
882      }
883      // success
884      return toRetry;
885    } catch (IOException e) {
886      LOG.error("Encountered unrecoverable error from region server, additional details: "
887        + serviceCallable.getExceptionMessageAdditionalDetail(), e);
888      LOG.warn("Received a " + e.getClass().getSimpleName() + " from region server: "
889        + serviceCallable.getExceptionMessageAdditionalDetail(), e);
890      if (
891        getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
892          && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
893            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
894      ) {
895        LOG.warn(
896          "Will attempt to retry loading failed HFiles. Retry #" + numRetries.incrementAndGet());
897        toRetry.addAll(lqis);
898        return toRetry;
899      }
900      LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
901      throw e;
902    }
903  }
904
905  /**
906   * If the table is created for the first time, then "completebulkload" reads the files twice. More
907   * modifications necessary if we want to avoid doing it.
908   */
909  private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
910    final FileSystem fs = hfofDir.getFileSystem(getConf());
911
912    // Add column families
913    // Build a set of keys
914    List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
915    SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
916    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
917      @Override
918      public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
919        ColumnFamilyDescriptorBuilder builder =
920          ColumnFamilyDescriptorBuilder.newBuilder(familyName);
921        familyBuilders.add(builder);
922        return builder;
923      }
924
925      @Override
926      public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
927        throws IOException {
928        Path hfile = hfileStatus.getPath();
929        try (HFile.Reader reader =
930          HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
931          if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
932            builder.setCompressionType(reader.getFileContext().getCompression());
933            LOG.info("Setting compression " + reader.getFileContext().getCompression().name()
934              + " for family " + builder.getNameAsString());
935          }
936          byte[] first = reader.getFirstRowKey().get();
937          byte[] last = reader.getLastRowKey().get();
938
939          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first="
940            + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
941
942          // To eventually infer start key-end key boundaries
943          Integer value = map.containsKey(first) ? map.get(first) : 0;
944          map.put(first, value + 1);
945
946          value = map.containsKey(last) ? map.get(last) : 0;
947          map.put(last, value - 1);
948        }
949      }
950    });
951
952    byte[][] keys = inferBoundaries(map);
953    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
954    familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
955      .forEachOrdered(tdBuilder::setColumnFamily);
956    admin.createTable(tdBuilder.build(), keys);
957
958    LOG.info("Table " + tableName + " is available!!");
959  }
960
961  private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
962    SecureBulkLoadClient secureClient) throws IOException {
963    fsDelegationToken.releaseDelegationToken();
964    if (bulkToken != null && secureClient != null) {
965      secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
966    }
967    if (pool != null) {
968      pool.shutdown();
969    }
970    if (!queue.isEmpty()) {
971      StringBuilder err = new StringBuilder();
972      err.append("-------------------------------------------------\n");
973      err.append("Bulk load aborted with some files not yet loaded:\n");
974      err.append("-------------------------------------------------\n");
975      for (LoadQueueItem q : queue) {
976        err.append("  ").append(q.getFilePath()).append('\n');
977      }
978      LOG.error(err.toString());
979    }
980  }
981
982  // unique file name for the table
983  private String getUniqueName() {
984    return UUID.randomUUID().toString().replaceAll("-", "");
985  }
986
987  /**
988   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
989   */
990  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
991    throws IOException {
992    Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream()
993      .map(f -> f.getNameAsString()).collect(Collectors.toSet());
994    List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
995      .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
996    if (unmatchedFamilies.size() > 0) {
997      String msg =
998        "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
999          + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
1000          + familyNames;
1001      LOG.error(msg);
1002      if (!silence) {
1003        throw new IOException(msg);
1004      }
1005    }
1006  }
1007
1008  /**
1009   * Populate the Queue with given HFiles
1010   */
1011  private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
1012    map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
1013  }
1014
1015  /**
1016   * Walk the given directory for all HFiles, and return a Queue containing all such files.
1017   */
1018  private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
1019    final boolean validateHFile) throws IOException {
1020    visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() {
1021      @Override
1022      public byte[] bulkFamily(final byte[] familyName) {
1023        return familyName;
1024      }
1025
1026      @Override
1027      public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
1028        long length = hfile.getLen();
1029        if (
1030          length
1031              > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
1032        ) {
1033          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
1034            + " bytes can be problematic as it may lead to oversplitting.");
1035        }
1036        ret.add(new LoadQueueItem(family, hfile.getPath()));
1037      }
1038    }, validateHFile);
1039  }
1040
1041  private interface BulkHFileVisitor<TFamily> {
1042
1043    TFamily bulkFamily(byte[] familyName) throws IOException;
1044
1045    void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
1046  }
1047
1048  /**
1049   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
1050   * non-valid hfiles.
1051   */
1052  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
1053    final BulkHFileVisitor<TFamily> visitor) throws IOException {
1054    visitBulkHFiles(fs, bulkDir, visitor, true);
1055  }
1056
1057  /**
1058   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
1059   * skip non-valid hfiles by default, or skip this validation by setting
1060   * 'hbase.loadincremental.validate.hfile' to false.
1061   */
1062  private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
1063    BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
1064    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
1065    for (FileStatus familyStat : familyDirStatuses) {
1066      if (!familyStat.isDirectory()) {
1067        LOG.warn("Skipping non-directory " + familyStat.getPath());
1068        continue;
1069      }
1070      Path familyDir = familyStat.getPath();
1071      byte[] familyName = Bytes.toBytes(familyDir.getName());
1072      // Skip invalid family
1073      try {
1074        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
1075      } catch (IllegalArgumentException e) {
1076        LOG.warn("Skipping invalid " + familyStat.getPath());
1077        continue;
1078      }
1079      TFamily family = visitor.bulkFamily(familyName);
1080
1081      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
1082      for (FileStatus hfileStatus : hfileStatuses) {
1083        if (!fs.isFile(hfileStatus.getPath())) {
1084          LOG.warn("Skipping non-file " + hfileStatus);
1085          continue;
1086        }
1087
1088        Path hfile = hfileStatus.getPath();
1089        // Skip "_", reference, HFileLink
1090        String fileName = hfile.getName();
1091        if (fileName.startsWith("_")) {
1092          continue;
1093        }
1094        if (StoreFileInfo.isReference(fileName)) {
1095          LOG.warn("Skipping reference " + fileName);
1096          continue;
1097        }
1098        if (HFileLink.isHFileLink(fileName)) {
1099          LOG.warn("Skipping HFileLink " + fileName);
1100          continue;
1101        }
1102
1103        // Validate HFile Format if needed
1104        if (validateHFile) {
1105          try {
1106            if (!HFile.isHFileFormat(fs, hfile)) {
1107              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
1108              continue;
1109            }
1110          } catch (FileNotFoundException e) {
1111            LOG.warn("the file " + hfile + " was removed");
1112            continue;
1113          }
1114        }
1115
1116        visitor.bulkHFile(family, hfileStatus);
1117      }
1118    }
1119  }
1120
1121  // Initialize a thread pool
1122  private ExecutorService createExecutorService() {
1123    ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
1124      new LinkedBlockingQueue<>(),
1125      new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
1126    pool.allowCoreThreadTimeOut(true);
1127    return pool;
1128  }
1129
1130  private final String toString(List<Pair<byte[], String>> list) {
1131    StringBuilder sb = new StringBuilder();
1132    sb.append('[');
1133    list.forEach(p -> {
1134      sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond())
1135        .append('}');
1136    });
1137    sb.append(']');
1138    return sb.toString();
1139  }
1140
1141  private boolean isSecureBulkLoadEndpointAvailable() {
1142    String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
1143    return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
1144  }
1145
1146  /**
1147   * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
1148   * filters, etc.
1149   */
1150  @InterfaceAudience.Private
1151  static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
1152    byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
1153    // Open reader with no block cache, and not in-memory
1154    Reference topReference = Reference.createTopReference(splitKey);
1155    Reference bottomReference = Reference.createBottomReference(splitKey);
1156
1157    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
1158    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
1159  }
1160
1161  /**
1162   * Copy half of an HFile into a new HFile.
1163   */
1164  private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
1165    Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
1166    FileSystem fs = inFile.getFileSystem(conf);
1167    CacheConfig cacheConf = CacheConfig.DISABLED;
1168    HalfStoreFileReader halfReader = null;
1169    StoreFileWriter halfWriter = null;
1170    try {
1171      ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
1172      StoreFileInfo storeFileInfo =
1173        new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
1174      storeFileInfo.initHFileInfo(context);
1175      halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
1176      storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
1177      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
1178
1179      int blocksize = familyDescriptor.getBlocksize();
1180      Algorithm compression = familyDescriptor.getCompressionType();
1181      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
1182      HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
1183        .withChecksumType(StoreUtils.getChecksumType(conf))
1184        .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
1185        .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
1186        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
1187      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
1188        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
1189      HFileScanner scanner = halfReader.getScanner(false, false, false);
1190      scanner.seekTo();
1191      do {
1192        halfWriter.append(scanner.getCell());
1193      } while (scanner.next());
1194
1195      for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
1196        if (shouldCopyHFileMetaKey(entry.getKey())) {
1197          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
1198        }
1199      }
1200    } finally {
1201      if (halfReader != null) {
1202        try {
1203          halfReader.close(cacheConf.shouldEvictOnClose());
1204        } catch (IOException e) {
1205          LOG.warn("failed to close hfile reader for " + inFile, e);
1206        }
1207      }
1208      if (halfWriter != null) {
1209        halfWriter.close();
1210      }
1211
1212    }
1213  }
1214
1215  private static boolean shouldCopyHFileMetaKey(byte[] key) {
1216    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
1217    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
1218      return false;
1219    }
1220
1221    return !HFileInfo.isReservedFileInfoKey(key);
1222  }
1223
1224  private boolean isCreateTable() {
1225    return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
1226  }
1227
1228  private boolean isSilence() {
1229    return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
1230  }
1231
1232  private boolean isAlwaysCopyFiles() {
1233    return getConf().getBoolean(ALWAYS_COPY_FILES, false);
1234  }
1235
1236  protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
1237    throws IOException {
1238    try (Connection connection = ConnectionFactory.createConnection(getConf());
1239      Admin admin = connection.getAdmin()) {
1240      if (!admin.tableExists(tableName)) {
1241        if (isCreateTable()) {
1242          createTable(tableName, hfofDir, admin);
1243        } else {
1244          String errorMsg = format("Table '%s' does not exist.", tableName);
1245          LOG.error(errorMsg);
1246          throw new TableNotFoundException(errorMsg);
1247        }
1248      }
1249      try (Table table = connection.getTable(tableName);
1250        RegionLocator locator = connection.getRegionLocator(tableName)) {
1251        return doBulkLoad(hfofDir, admin, table, locator, isSilence(), isAlwaysCopyFiles());
1252      }
1253    }
1254  }
1255
1256  /**
1257   * Perform bulk load on the given table.
1258   * @param hfofDir   the directory that was provided as the output path of a job using
1259   *                  HFileOutputFormat
1260   * @param tableName the table to load into
1261   */
1262  public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
1263    throws IOException {
1264    return run(new Path(hfofDir), tableName);
1265  }
1266
1267  /**
1268   * Perform bulk load on the given table.
1269   * @param family2Files map of family to List of hfiles
1270   * @param tableName    the table to load into
1271   */
1272  public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files,
1273    TableName tableName) throws IOException {
1274    try (Connection connection = ConnectionFactory.createConnection(getConf());
1275      Admin admin = connection.getAdmin()) {
1276      if (!admin.tableExists(tableName)) {
1277        String errorMsg = format("Table '%s' does not exist.", tableName);
1278        LOG.error(errorMsg);
1279        throw new TableNotFoundException(errorMsg);
1280      }
1281      try (Table table = connection.getTable(tableName);
1282        RegionLocator locator = connection.getRegionLocator(tableName)) {
1283        return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles());
1284      }
1285    }
1286  }
1287
1288  @Override
1289  public int run(String[] args) throws Exception {
1290    if (args.length != 2 && args.length != 3) {
1291      usage();
1292      return -1;
1293    }
1294    // Re-initialize to apply -D options from the command line parameters
1295    initialize();
1296    String dirPath = args[0];
1297    TableName tableName = TableName.valueOf(args[1]);
1298    if (args.length == 2) {
1299      return !run(dirPath, tableName).isEmpty() ? 0 : -1;
1300    } else {
1301      Map<byte[], List<Path>> family2Files = Maps.newHashMap();
1302      FileSystem fs = FileSystem.get(getConf());
1303      for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
1304        FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
1305          Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
1306          byte[] familyName = Bytes.toBytes(family);
1307          if (family2Files.containsKey(familyName)) {
1308            family2Files.get(familyName).add(path);
1309          } else {
1310            family2Files.put(familyName, Lists.newArrayList(path));
1311          }
1312        });
1313      }
1314      return !run(family2Files, tableName).isEmpty() ? 0 : -1;
1315    }
1316
1317  }
1318
1319  public static void main(String[] args) throws Exception {
1320    Configuration conf = HBaseConfiguration.create();
1321    int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
1322    System.exit(ret);
1323  }
1324
1325  /**
1326   * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
1327   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
1328   * property. This directory is used as a temporary directory where all files are initially
1329   * copied/moved from user given directory, set all the required file permissions and then from
1330   * their it is finally loaded into a table. This should be set only when, one would like to manage
1331   * the staging directory by itself. Otherwise this tool will handle this by itself.
1332   * @param stagingDir staging directory path
1333   */
1334  public void setBulkToken(String stagingDir) {
1335    this.bulkToken = stagingDir;
1336  }
1337
1338  public void setClusterIds(List<String> clusterIds) {
1339    this.clusterIds = clusterIds;
1340  }
1341
1342  /**
1343   * Disables replication for these bulkloaded files.
1344   */
1345  public void disableReplication() {
1346    this.replicate = false;
1347  }
1348
1349  /**
1350   * Infers region boundaries for a new table.
1351   * <p>
1352   * Parameter: <br>
1353   * bdryMap is a map between keys to an integer belonging to {+1, -1}
1354   * <ul>
1355   * <li>If a key is a start key of a file, then it maps to +1</li>
1356   * <li>If a key is an end key of a file, then it maps to -1</li>
1357   * </ul>
1358   * <p>
1359   * Algo:<br>
1360   * <ol>
1361   * <li>Poll on the keys in order:
1362   * <ol type="a">
1363   * <li>Keep adding the mapped values to these keys (runningSum)</li>
1364   * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
1365   * boundary list.</li>
1366   * </ol>
1367   * </li>
1368   * <li>Return the boundary list.</li>
1369   * </ol>
1370   */
1371  public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
1372    List<byte[]> keysArray = new ArrayList<>();
1373    int runningValue = 0;
1374    byte[] currStartKey = null;
1375    boolean firstBoundary = true;
1376
1377    for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
1378      if (runningValue == 0) {
1379        currStartKey = item.getKey();
1380      }
1381      runningValue += item.getValue();
1382      if (runningValue == 0) {
1383        if (!firstBoundary) {
1384          keysArray.add(currStartKey);
1385        }
1386        firstBoundary = false;
1387      }
1388    }
1389
1390    return keysArray.toArray(new byte[0][]);
1391  }
1392}