001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static java.lang.String.format;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.util.ArrayDeque;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Deque;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Optional;
039import java.util.Set;
040import java.util.SortedMap;
041import java.util.TreeMap;
042import java.util.UUID;
043import java.util.concurrent.Callable;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Future;
048import java.util.concurrent.LinkedBlockingQueue;
049import java.util.concurrent.ThreadPoolExecutor;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.stream.Collectors;
053import org.apache.commons.lang3.mutable.MutableInt;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.conf.Configured;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellUtil;
062import org.apache.hadoop.hbase.ExtendedCell;
063import org.apache.hadoop.hbase.HBaseConfiguration;
064import org.apache.hadoop.hbase.HBaseInterfaceAudience;
065import org.apache.hadoop.hbase.HConstants;
066import org.apache.hadoop.hbase.HRegionLocation;
067import org.apache.hadoop.hbase.KeyValue;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.client.AsyncAdmin;
071import org.apache.hadoop.hbase.client.AsyncClusterConnection;
072import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
073import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.io.HFileLink;
079import org.apache.hadoop.hbase.io.Reference;
080import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
081import org.apache.hadoop.hbase.io.hfile.CacheConfig;
082import org.apache.hadoop.hbase.io.hfile.HFile;
083import org.apache.hadoop.hbase.io.hfile.HFileContext;
084import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
085import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
086import org.apache.hadoop.hbase.io.hfile.HFileInfo;
087import org.apache.hadoop.hbase.io.hfile.ReaderContext;
088import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
091import org.apache.hadoop.hbase.regionserver.StoreFileReader;
092import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
093import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
094import org.apache.hadoop.hbase.regionserver.StoreUtils;
095import org.apache.hadoop.hbase.security.UserProvider;
096import org.apache.hadoop.hbase.security.token.FsDelegationToken;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
099import org.apache.hadoop.hbase.util.FSVisitor;
100import org.apache.hadoop.hbase.util.FutureUtils;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.util.Tool;
103import org.apache.hadoop.util.ToolRunner;
104import org.apache.yetus.audience.InterfaceAudience;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107
108import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
109import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
110import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
111import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
112import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
113import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
114
115/**
116 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
117 * tool.
118 */
119@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
120public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool {
121
122  private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
123
124  /**
125   * Keep locality while generating HFiles for bulkload. See HBASE-12596
126   */
127  public static final String LOCALITY_SENSITIVE_CONF_KEY =
128    "hbase.bulkload.locality.sensitive.enabled";
129  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
130
131  public static final String NAME = "completebulkload";
132  /**
133   * Whether to run validation on hfiles before loading.
134   */
135  private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
136  /**
137   * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
138   * of compacting at server side
139   */
140  public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
141
142  public static final String FAIL_IF_NEED_SPLIT_HFILE =
143    "hbase.loadincremental.fail.if.need.split.hfile";
144
145  // We use a '.' prefix which is ignored when walking directory trees
146  // above. It is invalid family name.
147  static final String TMP_DIR = ".tmp";
148
149  private int maxFilesPerRegionPerFamily;
150  private boolean assignSeqIds;
151  private boolean bulkLoadByFamily;
152
153  // Source delegation token
154  private FsDelegationToken fsDelegationToken;
155  private UserProvider userProvider;
156  private int nrThreads;
157  private final AtomicInteger numRetries = new AtomicInteger(0);
158  private String bulkToken;
159
160  private List<String> clusterIds = new ArrayList<>();
161  private boolean replicate = true;
162  private boolean failIfNeedSplitHFile = false;
163
164  public BulkLoadHFilesTool(Configuration conf) {
165    // make a copy, just to be sure we're not overriding someone else's config
166    super(new Configuration(conf));
167    initialize();
168  }
169
170  public void initialize() {
171    Configuration conf = getConf();
172    // disable blockcache for tool invocation, see HBASE-10500
173    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
174    userProvider = UserProvider.instantiate(conf);
175    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
176    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
177    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
178    nrThreads =
179      conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
180    bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
181    failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false);
182  }
183
184  // Initialize a thread pool
185  private ExecutorService createExecutorService() {
186    ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
187      new LinkedBlockingQueue<>(),
188      new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build());
189    pool.allowCoreThreadTimeOut(true);
190    return pool;
191  }
192
193  private boolean isCreateTable() {
194    return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
195  }
196
197  private boolean isSilence() {
198    return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
199  }
200
201  private boolean isAlwaysCopyFiles() {
202    return getConf().getBoolean(ALWAYS_COPY_FILES, false);
203  }
204
205  private static boolean shouldCopyHFileMetaKey(byte[] key) {
206    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
207    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
208      return false;
209    }
210
211    return !HFileInfo.isReservedFileInfoKey(key);
212  }
213
214  /**
215   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
216   */
217  private static void validateFamiliesInHFiles(TableDescriptor tableDesc,
218    Deque<LoadQueueItem> queue, boolean silence) throws IOException {
219    Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies())
220      .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
221    List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
222      .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
223    if (unmatchedFamilies.size() > 0) {
224      String msg =
225        "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
226          + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName()
227          + " are: " + familyNames;
228      LOG.error(msg);
229      if (!silence) {
230        throw new IOException(msg);
231      }
232    }
233  }
234
235  /**
236   * Populate the Queue with given HFiles
237   */
238  private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
239    map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
240  }
241
242  private interface BulkHFileVisitor<TFamily> {
243
244    TFamily bulkFamily(byte[] familyName) throws IOException;
245
246    void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
247  }
248
249  /**
250   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
251   * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES}
252   * to false.
253   */
254  private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
255    BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
256    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
257    for (FileStatus familyStat : familyDirStatuses) {
258      if (!familyStat.isDirectory()) {
259        LOG.warn("Skipping non-directory " + familyStat.getPath());
260        continue;
261      }
262      Path familyDir = familyStat.getPath();
263      byte[] familyName = Bytes.toBytes(familyDir.getName());
264      // Skip invalid family
265      try {
266        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
267      } catch (IllegalArgumentException e) {
268        LOG.warn("Skipping invalid " + familyStat.getPath());
269        continue;
270      }
271      TFamily family = visitor.bulkFamily(familyName);
272
273      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
274      for (FileStatus hfileStatus : hfileStatuses) {
275        if (!fs.isFile(hfileStatus.getPath())) {
276          LOG.warn("Skipping non-file " + hfileStatus);
277          continue;
278        }
279
280        Path hfile = hfileStatus.getPath();
281        // Skip "_", reference, HFileLink
282        String fileName = hfile.getName();
283        if (fileName.startsWith("_")) {
284          continue;
285        }
286        if (StoreFileInfo.isReference(fileName)) {
287          LOG.warn("Skipping reference " + fileName);
288          continue;
289        }
290        if (HFileLink.isHFileLink(fileName)) {
291          LOG.warn("Skipping HFileLink " + fileName);
292          continue;
293        }
294
295        // Validate HFile Format if needed
296        if (validateHFile) {
297          try {
298            if (!HFile.isHFileFormat(fs, hfile)) {
299              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
300              continue;
301            }
302          } catch (FileNotFoundException e) {
303            LOG.warn("the file " + hfile + " was removed");
304            continue;
305          }
306        }
307
308        visitor.bulkHFile(family, hfileStatus);
309      }
310    }
311  }
312
313  /**
314   * Walk the given directory for all HFiles, and return a Queue containing all such files.
315   */
316  private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir,
317    boolean validateHFile) throws IOException {
318    visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() {
319      @Override
320      public byte[] bulkFamily(final byte[] familyName) {
321        return familyName;
322      }
323
324      @Override
325      public void bulkHFile(final byte[] family, final FileStatus hfile) {
326        long length = hfile.getLen();
327        if (
328          length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
329        ) {
330          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
331            + " bytes can be problematic as it may lead to oversplitting.");
332        }
333        ret.add(new LoadQueueItem(family, hfile.getPath()));
334      }
335    }, validateHFile);
336  }
337
338  /**
339   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
340   * passed directory and validates whether the prepared queue has all the valid table column
341   * families in it.
342   * @param map       map of family to List of hfiles
343   * @param tableName table to which hfiles should be loaded
344   * @param queue     queue which needs to be loaded into the table
345   * @param silence   true to ignore unmatched column families
346   * @throws IOException If any I/O or network error occurred
347   */
348  public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName,
349    Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException {
350    populateLoadQueue(queue, map);
351    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
352      silence);
353  }
354
355  /**
356   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
357   * passed directory and validates whether the prepared queue has all the valid table column
358   * families in it.
359   * @param hfilesDir     directory containing list of hfiles to be loaded into the table
360   * @param queue         queue which needs to be loaded into the table
361   * @param validateHFile if true hfiles will be validated for its format
362   * @param silence       true to ignore unmatched column families
363   * @throws IOException If any I/O or network error occurred
364   */
365  public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn,
366    TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile,
367    boolean silence) throws IOException {
368    discoverLoadQueue(conf, queue, hfilesDir, validateHFile);
369    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
370      silence);
371  }
372
373  /**
374   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
375   * <ol>
376   * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)}
377   * </li>
378   * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)}
379   * </li>
380   * </ol>
381   * @param conn      Connection to use
382   * @param tableName Table to which these hfiles should be loaded to
383   * @param queue     {@code LoadQueueItem} has hfiles yet to be loaded
384   */
385  public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName,
386    Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException {
387    ExecutorService pool = createExecutorService();
388    try {
389      Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool,
390        queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst();
391      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
392    } finally {
393      pool.shutdown();
394    }
395  }
396
397  /**
398   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
399   * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
400   * maintain row atomicity guarantees, region server side should succeed atomically and fails
401   * atomically.
402   * @param conn      Connection to use
403   * @param tableName Table to which these hfiles should be loaded to
404   * @param copyFiles whether replicate to peer cluster while bulkloading
405   * @param first     the start key of region
406   * @param lqis      hfiles should be loaded
407   * @return empty list if success, list of items to retry on recoverable failure
408   */
409  @InterfaceAudience.Private
410  protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
411    final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
412    final byte[] first, Collection<LoadQueueItem> lqis) {
413    List<Pair<byte[], String>> familyPaths =
414      lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
415        .collect(Collectors.toList());
416    CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
417    FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
418      fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
419      (loaded, error) -> {
420        if (error != null) {
421          LOG.error("Encountered unrecoverable error from region server", error);
422          if (
423            getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
424              && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
425                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
426          ) {
427            LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
428              + numRetries.incrementAndGet());
429            // return lqi's to retry
430            future.complete(lqis);
431          } else {
432            LOG.error(RETRY_ON_IO_EXCEPTION
433              + " is disabled or we have reached retry limit. Unable to recover");
434            future.completeExceptionally(error);
435          }
436        } else {
437          if (loaded) {
438            future.complete(Collections.emptyList());
439          } else {
440            LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
441              + " into table " + tableName + " with files " + lqis
442              + " failed.  This is recoverable and they will be retried.");
443            // return lqi's to retry
444            future.complete(lqis);
445          }
446        }
447      });
448    return future;
449  }
450
451  /**
452   * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
453   * re-queued for another pass with the groupOrSplitPhase.
454   * <p/>
455   * protected for testing.
456   */
457  @InterfaceAudience.Private
458  protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
459    Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles,
460    Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
461    // atomically bulk load the groups.
462    List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
463    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
464      .entrySet()) {
465      byte[] first = entry.getKey().array();
466      final Collection<LoadQueueItem> lqis = entry.getValue();
467      if (bulkLoadByFamily) {
468        groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
469          .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
470      } else {
471        loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
472      }
473      if (item2RegionMap != null) {
474        for (LoadQueueItem lqi : lqis) {
475          item2RegionMap.put(lqi, entry.getKey());
476        }
477      }
478    }
479
480    // get all the results.
481    for (Future<Collection<LoadQueueItem>> future : loadingFutures) {
482      try {
483        Collection<LoadQueueItem> toRetry = future.get();
484
485        if (item2RegionMap != null) {
486          for (LoadQueueItem lqi : toRetry) {
487            item2RegionMap.remove(lqi);
488          }
489        }
490        // LQIs that are requeued to be regrouped.
491        queue.addAll(toRetry);
492      } catch (ExecutionException e1) {
493        Throwable t = e1.getCause();
494        if (t instanceof IOException) {
495          // At this point something unrecoverable has happened.
496          // TODO Implement bulk load recovery
497          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
498        }
499        LOG.error("Unexpected execution exception during bulk load", e1);
500        throw new IllegalStateException(t);
501      } catch (InterruptedException e1) {
502        LOG.error("Unexpected interrupted exception during bulk load", e1);
503        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
504      }
505    }
506  }
507
508  private Map<byte[], Collection<LoadQueueItem>>
509    groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
510    Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
511    itemsInRegion.forEach(item -> families2Queue
512      .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
513    return families2Queue;
514  }
515
516  private boolean
517    checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
518    for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
519      Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
520      for (LoadQueueItem lqi : e.getValue()) {
521        MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
522        count.increment();
523        if (count.intValue() > maxFilesPerRegionPerFamily) {
524          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family "
525            + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key "
526            + Bytes.toStringBinary(e.getKey()));
527          return false;
528        }
529      }
530    }
531    return true;
532  }
533
534  /**
535   * @param conn         the HBase cluster connection
536   * @param tableName    the table name of the table to load into
537   * @param pool         the ExecutorService
538   * @param queue        the queue for LoadQueueItem
539   * @param startEndKeys start and end keys
540   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
541   */
542  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
543    AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
544    Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
545    // <region start key, LQI> need synchronized only within this scope of this
546    // phase because of the puts that happen in futures.
547    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
548    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
549    Set<String> missingHFiles = new HashSet<>();
550    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
551      new Pair<>(regionGroups, missingHFiles);
552
553    // drain LQIs and figure out bulk load groups
554    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
555    while (!queue.isEmpty()) {
556      final LoadQueueItem item = queue.remove();
557      final Callable<Pair<List<LoadQueueItem>, String>> call =
558        () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
559      splittingFutures.add(pool.submit(call));
560    }
561    // get all the results. All grouping and splitting must finish before
562    // we can attempt the atomic loads.
563    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
564      try {
565        Pair<List<LoadQueueItem>, String> splits = lqis.get();
566        if (splits != null) {
567          if (splits.getFirst() != null) {
568            queue.addAll(splits.getFirst());
569          } else {
570            missingHFiles.add(splits.getSecond());
571          }
572        }
573      } catch (ExecutionException e1) {
574        Throwable t = e1.getCause();
575        if (t instanceof IOException) {
576          LOG.error("IOException during splitting", e1);
577          throw (IOException) t; // would have been thrown if not parallelized,
578        }
579        LOG.error("Unexpected execution exception during splitting", e1);
580        throw new IllegalStateException(t);
581      } catch (InterruptedException e1) {
582        LOG.error("Unexpected interrupted exception during splitting", e1);
583        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
584      }
585    }
586    return pair;
587  }
588
589  // unique file name for the table
590  private String getUniqueName() {
591    return UUID.randomUUID().toString().replaceAll("-", "");
592  }
593
594  private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
595    TableDescriptor tableDesc, byte[] splitKey) throws IOException {
596    Path hfilePath = item.getFilePath();
597    byte[] family = item.getFamily();
598    Path tmpDir = hfilePath.getParent();
599    if (!tmpDir.getName().equals(TMP_DIR)) {
600      tmpDir = new Path(tmpDir, TMP_DIR);
601    }
602
603    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
604
605    String uniqueName = getUniqueName();
606    ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family);
607
608    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
609    Path topOut = new Path(tmpDir, uniqueName + ".top");
610
611    splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
612
613    FileSystem fs = tmpDir.getFileSystem(getConf());
614    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
615    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
616    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
617
618    // Add these back at the *front* of the queue, so there's a lower
619    // chance that the region will just split again before we get there.
620    List<LoadQueueItem> lqis = new ArrayList<>(2);
621    lqis.add(new LoadQueueItem(family, botOut));
622    lqis.add(new LoadQueueItem(family, topOut));
623
624    // If the current item is already the result of previous splits,
625    // we don't need it anymore. Clean up to save space.
626    // It is not part of the original input files.
627    try {
628      if (tmpDir.getName().equals(TMP_DIR)) {
629        fs.delete(hfilePath, false);
630      }
631    } catch (IOException e) {
632      LOG.warn("Unable to delete temporary split file " + hfilePath);
633    }
634    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
635    return lqis;
636  }
637
638  /**
639   * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
640   *                     order by start key
641   * @param key          the key need to find which region belong to
642   * @return region index
643   */
644  private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) {
645    int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW),
646      (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
647    if (idx < 0) {
648      // not on boundary, returns -(insertion index). Calculate region it
649      // would be in.
650      idx = -(idx + 1) - 1;
651    }
652    return idx;
653  }
654
655  /**
656   * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then
657   * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the
658   * next region. 3) if the endkey of the last region is not empty.
659   */
660  private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys,
661    TableName tableName) throws IOException {
662    if (idx < 0) {
663      throw new IOException("The first region info for table " + tableName
664        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
665    } else if (
666      (idx == startEndKeys.size() - 1)
667        && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY)
668    ) {
669      throw new IOException("The last region info for table " + tableName
670        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
671    } else if (
672      idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(),
673        startEndKeys.get(idx + 1).getFirst()) == 0)
674    ) {
675      throw new IOException("The endkey of one region for table " + tableName
676        + " is not equal to the startkey of the next region in hbase:meta."
677        + "Please use hbck tool to fix it first.");
678    }
679  }
680
681  /**
682   * Attempt to assign the given load queue item into its target region group. If the hfile boundary
683   * no longer fits into a region, physically splits the hfile such that the new bottom half will
684   * fit and returns the list of LQI's corresponding to the resultant hfiles.
685   * <p/>
686   * protected for testing
687   * @throws IOException if an IO failure is encountered
688   */
689  @InterfaceAudience.Private
690  protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
691    TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
692    List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
693    Path hfilePath = item.getFilePath();
694    Optional<byte[]> first, last;
695    try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
696      CacheConfig.DISABLED, true, getConf())) {
697      first = hfr.getFirstRowKey();
698      last = hfr.getLastRowKey();
699    } catch (FileNotFoundException fnfe) {
700      LOG.debug("encountered", fnfe);
701      return new Pair<>(null, hfilePath.getName());
702    }
703
704    LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary)
705      + " last=" + last.map(Bytes::toStringBinary));
706    if (!first.isPresent() || !last.isPresent()) {
707      assert !first.isPresent() && !last.isPresent();
708      // TODO what if this is due to a bad HFile?
709      LOG.info("hfile " + hfilePath + " has no entries, skipping");
710      return null;
711    }
712    if (Bytes.compareTo(first.get(), last.get()) > 0) {
713      throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
714        + " > " + Bytes.toStringBinary(last.get()));
715    }
716    int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
717    checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName);
718    boolean lastKeyInRange =
719      Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
720        .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
721    if (!lastKeyInRange) {
722      if (failIfNeedSplitHFile) {
723        throw new IOException(
724          "The key range of hfile=" + hfilePath + " fits into no region. " + "And because "
725            + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps.");
726      }
727      int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
728      int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
729      // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
730      // than hfile.endkey w/o this check
731      if (splitIdx != firstKeyRegionIdx) {
732        checkRegionIndexValid(splitIdx, startEndKeys, tableName);
733      }
734      byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
735      List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
736        FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
737
738      return new Pair<>(lqis, null);
739    }
740
741    // group regions.
742    regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item);
743    return null;
744  }
745
746  /**
747   * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
748   * recreating bloom filters, etc.
749   */
750  @InterfaceAudience.Private
751  static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
752    ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
753    throws IOException {
754    // Open reader with no block cache, and not in-memory
755    Reference topReference = Reference.createTopReference(splitKey);
756    Reference bottomReference = Reference.createBottomReference(splitKey);
757
758    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
759    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
760  }
761
762  private static StoreFileWriter initStoreFileWriter(Configuration conf, Cell cell,
763    HFileContext hFileContext, CacheConfig cacheConf, BloomType bloomFilterType, FileSystem fs,
764    Path outFile, AsyncTableRegionLocator loc) throws IOException {
765    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
766      byte[] rowKey = CellUtil.cloneRow(cell);
767      HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
768      InetSocketAddress[] favoredNodes = null;
769      if (null == hRegionLocation) {
770        LOG.warn("Failed get region location for  rowkey {} , Using writer without favoured nodes.",
771          Bytes.toString(rowKey));
772        return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
773          .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
774      } else {
775        LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
776        InetSocketAddress initialIsa =
777          new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
778        if (initialIsa.isUnresolved()) {
779          LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
780            hRegionLocation);
781          return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
782            .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
783        } else {
784          LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
785          favoredNodes = new InetSocketAddress[] { initialIsa };
786          return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
787            .withBloomType(bloomFilterType).withFileContext(hFileContext)
788            .withFavoredNodes(favoredNodes).build();
789        }
790      }
791    } else {
792      return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
793        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
794    }
795  }
796
797  /**
798   * Copy half of an HFile into a new HFile with favored nodes.
799   */
800  private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
801    Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
802    throws IOException {
803    FileSystem fs = inFile.getFileSystem(conf);
804    CacheConfig cacheConf = CacheConfig.DISABLED;
805    StoreFileReader halfReader = null;
806    StoreFileWriter halfWriter = null;
807    try {
808      ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
809      StoreFileInfo storeFileInfo =
810        new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
811      storeFileInfo.initHFileInfo(context);
812      halfReader = storeFileInfo.createReader(context, cacheConf);
813      storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
814      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
815
816      int blocksize = familyDescriptor.getBlocksize();
817      Algorithm compression = familyDescriptor.getCompressionType();
818      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
819      HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
820        .withChecksumType(StoreUtils.getChecksumType(conf))
821        .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
822        .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
823        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
824
825      try (StoreFileScanner scanner =
826        halfReader.getStoreFileScanner(false, false, false, Long.MAX_VALUE, 0, false)) {
827        scanner.seek(KeyValue.LOWESTKEY);
828        for (;;) {
829          ExtendedCell cell = scanner.next();
830          if (cell == null) {
831            break;
832          }
833          if (halfWriter == null) {
834            // init halfwriter
835            halfWriter = initStoreFileWriter(conf, cell, hFileContext, cacheConf, bloomFilterType,
836              fs, outFile, loc);
837          }
838          halfWriter.append(cell);
839        }
840      }
841      for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
842        if (shouldCopyHFileMetaKey(entry.getKey())) {
843          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
844        }
845      }
846    } finally {
847      if (halfReader != null) {
848        try {
849          halfReader.close(cacheConf.shouldEvictOnClose());
850        } catch (IOException e) {
851          LOG.warn("failed to close hfile reader for " + inFile, e);
852        }
853      }
854      if (halfWriter != null) {
855        halfWriter.close();
856      }
857    }
858  }
859
860  /**
861   * Infers region boundaries for a new table.
862   * <p/>
863   * Parameter: <br/>
864   * bdryMap is a map between keys to an integer belonging to {+1, -1}
865   * <ul>
866   * <li>If a key is a start key of a file, then it maps to +1</li>
867   * <li>If a key is an end key of a file, then it maps to -1</li>
868   * </ul>
869   * <p>
870   * Algo:<br/>
871   * <ol>
872   * <li>Poll on the keys in order:
873   * <ol type="a">
874   * <li>Keep adding the mapped values to these keys (runningSum)</li>
875   * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
876   * boundary list.</li>
877   * </ol>
878   * </li>
879   * <li>Return the boundary list.</li>
880   * </ol>
881   */
882  public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
883    List<byte[]> keysArray = new ArrayList<>();
884    int runningValue = 0;
885    byte[] currStartKey = null;
886    boolean firstBoundary = true;
887
888    for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
889      if (runningValue == 0) {
890        currStartKey = item.getKey();
891      }
892      runningValue += item.getValue();
893      if (runningValue == 0) {
894        if (!firstBoundary) {
895          keysArray.add(currStartKey);
896        }
897        firstBoundary = false;
898      }
899    }
900
901    return keysArray.toArray(new byte[0][]);
902  }
903
904  /**
905   * If the table is created for the first time, then "completebulkload" reads the files twice. More
906   * modifications necessary if we want to avoid doing it.
907   */
908  private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException {
909    final FileSystem fs = hfofDir.getFileSystem(getConf());
910
911    // Add column families
912    // Build a set of keys
913    List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
914    SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
915    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
916      @Override
917      public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
918        ColumnFamilyDescriptorBuilder builder =
919          ColumnFamilyDescriptorBuilder.newBuilder(familyName);
920        familyBuilders.add(builder);
921        return builder;
922      }
923
924      @Override
925      public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
926        throws IOException {
927        Path hfile = hfileStatus.getPath();
928        try (HFile.Reader reader =
929          HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
930          if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
931            builder.setCompressionType(reader.getFileContext().getCompression());
932            LOG.info("Setting compression " + reader.getFileContext().getCompression().name()
933              + " for family " + builder.getNameAsString());
934          }
935          byte[] first = reader.getFirstRowKey().get();
936          byte[] last = reader.getLastRowKey().get();
937
938          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first="
939            + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
940
941          // To eventually infer start key-end key boundaries
942          Integer value = map.getOrDefault(first, 0);
943          map.put(first, value + 1);
944
945          value = map.containsKey(last) ? map.get(last) : 0;
946          map.put(last, value - 1);
947        }
948      }
949    }, true);
950
951    byte[][] keys = inferBoundaries(map);
952    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
953    familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
954      .forEachOrdered(tdBuilder::setColumnFamily);
955    FutureUtils.get(admin.createTable(tdBuilder.build(), keys));
956
957    LOG.info("Table " + tableName + " is available!!");
958  }
959
960  private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn,
961    TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile)
962    throws IOException {
963    int count = 0;
964
965    fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
966    bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName));
967    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
968
969    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
970    // Assumes that region splits can happen while this occurs.
971    while (!queue.isEmpty()) {
972      // need to reload split keys each iteration.
973      final List<Pair<byte[], byte[]>> startEndKeys =
974        FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys());
975      if (count != 0) {
976        LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with "
977          + queue.size() + " files remaining to group or split");
978      }
979
980      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
981      maxRetries = Math.max(maxRetries, startEndKeys.size() + 1);
982      if (maxRetries != 0 && count >= maxRetries) {
983        throw new IOException(
984          "Retry attempted " + count + " times without completing, bailing out");
985      }
986      count++;
987
988      // Using ByteBuffer for byte[] equality semantics
989      pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys);
990      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
991
992      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
993        // Error is logged inside checkHFilesCountPerRegionPerFamily.
994        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
995          + " hfiles to one family of one region");
996      }
997
998      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap);
999
1000      // NOTE: The next iteration's split / group could happen in parallel to
1001      // atomic bulkloads assuming that there are splits and no merges, and
1002      // that we can atomically pull out the groups we want to retry.
1003    }
1004
1005    return item2RegionMap;
1006  }
1007
1008  private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue,
1009    ExecutorService pool) throws IOException {
1010    fsDelegationToken.releaseDelegationToken();
1011    if (bulkToken != null) {
1012      conn.cleanupBulkLoad(tableName, bulkToken);
1013    }
1014    if (pool != null) {
1015      pool.shutdown();
1016    }
1017    if (!queue.isEmpty()) {
1018      StringBuilder err = new StringBuilder();
1019      err.append("-------------------------------------------------\n");
1020      err.append("Bulk load aborted with some files not yet loaded:\n");
1021      err.append("-------------------------------------------------\n");
1022      for (LoadQueueItem q : queue) {
1023        err.append("  ").append(q.getFilePath()).append('\n');
1024      }
1025      LOG.error(err.toString());
1026    }
1027  }
1028
1029  /**
1030   * Perform a bulk load of the given map of families to hfiles into the given pre-existing table.
1031   * This method is not threadsafe.
1032   * @param map       map of family to List of hfiles
1033   * @param tableName table to load the hfiles
1034   * @param silence   true to ignore unmatched column families
1035   * @param copyFile  always copy hfiles if true
1036   */
1037  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
1038    TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile)
1039    throws IOException {
1040    tableExists(conn, tableName);
1041    // LQI queue does not need to be threadsafe -- all operations on this queue
1042    // happen in this thread
1043    Deque<LoadQueueItem> queue = new ArrayDeque<>();
1044    ExecutorService pool = null;
1045    try {
1046      prepareHFileQueue(conn, tableName, map, queue, silence);
1047      if (queue.isEmpty()) {
1048        LOG.warn("Bulk load operation did not get any files to load");
1049        return Collections.emptyMap();
1050      }
1051      pool = createExecutorService();
1052      return performBulkLoad(conn, tableName, queue, pool, copyFile);
1053    } finally {
1054      cleanup(conn, tableName, queue, pool);
1055    }
1056  }
1057
1058  /**
1059   * Perform a bulk load of the given directory into the given pre-existing table. This method is
1060   * not threadsafe.
1061   * @param tableName table to load the hfiles
1062   * @param hfofDir   the directory that was provided as the output path of a job using
1063   *                  HFileOutputFormat
1064   * @param silence   true to ignore unmatched column families
1065   * @param copyFile  always copy hfiles if true
1066   */
1067  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
1068    TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException {
1069    tableExists(conn, tableName);
1070
1071    /*
1072     * Checking hfile format is a time-consuming operation, we should have an option to skip this
1073     * step when bulkloading millions of HFiles. See HBASE-13985.
1074     */
1075    boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true);
1076    if (!validateHFile) {
1077      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files "
1078        + "are not correct. If you fail to read data from your table after using this "
1079        + "option, consider removing the files and bulkload again without this option. "
1080        + "See HBASE-13985");
1081    }
1082    // LQI queue does not need to be threadsafe -- all operations on this queue
1083    // happen in this thread
1084    Deque<LoadQueueItem> queue = new ArrayDeque<>();
1085    ExecutorService pool = null;
1086    try {
1087      prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence);
1088
1089      if (queue.isEmpty()) {
1090        LOG.warn(
1091          "Bulk load operation did not find any files to load in directory {}. "
1092            + "Does it contain files in subdirectories that correspond to column family names?",
1093          (hfofDir != null ? hfofDir.toUri().toString() : ""));
1094        return Collections.emptyMap();
1095      }
1096      pool = createExecutorService();
1097      return performBulkLoad(conn, tableName, queue, pool, copyFile);
1098    } finally {
1099      cleanup(conn, tableName, queue, pool);
1100    }
1101  }
1102
1103  @Override
1104  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
1105    Map<byte[], List<Path>> family2Files) throws IOException {
1106    try (AsyncClusterConnection conn = ClusterConnectionFactory
1107      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1108      return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles());
1109    }
1110  }
1111
1112  @Override
1113  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException {
1114    try (AsyncClusterConnection conn = ClusterConnectionFactory
1115      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1116      AsyncAdmin admin = conn.getAdmin();
1117      if (!FutureUtils.get(admin.tableExists(tableName))) {
1118        if (isCreateTable()) {
1119          createTable(tableName, dir, admin);
1120        } else {
1121          throwAndLogTableNotFoundException(tableName);
1122        }
1123      }
1124      return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles());
1125    }
1126  }
1127
1128  /**
1129   * @throws TableNotFoundException if table does not exist.
1130   */
1131  private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException {
1132    if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
1133      throwAndLogTableNotFoundException(tableName);
1134    }
1135  }
1136
1137  private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException {
1138    String errorMsg = format("Table '%s' does not exist.", tn);
1139    LOG.error(errorMsg);
1140    throw new TableNotFoundException(errorMsg);
1141  }
1142
1143  public void setBulkToken(String bulkToken) {
1144    this.bulkToken = bulkToken;
1145  }
1146
1147  public void setClusterIds(List<String> clusterIds) {
1148    this.clusterIds = clusterIds;
1149  }
1150
1151  private void usage() {
1152    System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
1153      + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
1154      + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- "
1155      + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D"
1156      + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target "
1157      + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY
1158      + "=yes to ignore unmatched column families.\n"
1159      + " -loadTable for when directory of files to load has a depth of 3; target table must "
1160      + "exist;\n" + " must be last of the options on command line.\n"
1161      + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for "
1162      + "documentation.\n");
1163  }
1164
1165  @Override
1166  public int run(String[] args) throws Exception {
1167    if (args.length != 2 && args.length != 3) {
1168      usage();
1169      return -1;
1170    }
1171    // Re-initialize to apply -D options from the command line parameters
1172    initialize();
1173    Path dirPath = new Path(args[0]);
1174    TableName tableName = TableName.valueOf(args[1]);
1175    if (args.length == 2) {
1176      return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1;
1177    } else {
1178      Map<byte[], List<Path>> family2Files = Maps.newHashMap();
1179      FileSystem fs = FileSystem.get(getConf());
1180      for (FileStatus regionDir : fs.listStatus(dirPath)) {
1181        FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
1182          Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
1183          byte[] familyName = Bytes.toBytes(family);
1184          if (family2Files.containsKey(familyName)) {
1185            family2Files.get(familyName).add(path);
1186          } else {
1187            family2Files.put(familyName, Lists.newArrayList(path));
1188          }
1189        });
1190      }
1191      return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1;
1192    }
1193  }
1194
1195  public static void main(String[] args) throws Exception {
1196    Configuration conf = HBaseConfiguration.create();
1197    int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
1198    System.exit(ret);
1199  }
1200
1201  @Override
1202  public void disableReplication() {
1203    this.replicate = false;
1204  }
1205
1206  @Override
1207  public boolean isReplicationDisabled() {
1208    return !this.replicate;
1209  }
1210}