001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.HTableDescriptor;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionLocator;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.fs.HFileSystem;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.io.hfile.HFile;
070import org.apache.hadoop.hbase.io.hfile.HFileContext;
071import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
072import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
073import org.apache.hadoop.hbase.regionserver.BloomType;
074import org.apache.hadoop.hbase.regionserver.HStore;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.regionserver.StoreUtils;
077import org.apache.hadoop.hbase.util.BloomFilterUtil;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CommonFSUtils;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
082import org.apache.hadoop.io.NullWritable;
083import org.apache.hadoop.io.SequenceFile;
084import org.apache.hadoop.io.Text;
085import org.apache.hadoop.mapreduce.Job;
086import org.apache.hadoop.mapreduce.OutputCommitter;
087import org.apache.hadoop.mapreduce.OutputFormat;
088import org.apache.hadoop.mapreduce.RecordWriter;
089import org.apache.hadoop.mapreduce.TaskAttemptContext;
090import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
091import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
092import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
093import org.apache.yetus.audience.InterfaceAudience;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
099 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
100 * forcibly roll all HFiles being written.
101 * <p>
102 * Using this class as part of a MapReduce job is best done using
103 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
104 */
105@InterfaceAudience.Public
106public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
107  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
108
109  static class TableInfo {
110    private TableDescriptor tableDesctiptor;
111    private RegionLocator regionLocator;
112
113    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
114      this.tableDesctiptor = tableDesctiptor;
115      this.regionLocator = regionLocator;
116    }
117
118    /**
119     * The modification for the returned HTD doesn't affect the inner TD.
120     * @return A clone of inner table descriptor
121     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
122     *             instead.
123     * @see #getTableDescriptor()
124     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
125     */
126    @Deprecated
127    public HTableDescriptor getHTableDescriptor() {
128      return new HTableDescriptor(tableDesctiptor);
129    }
130
131    public TableDescriptor getTableDescriptor() {
132      return tableDesctiptor;
133    }
134
135    public RegionLocator getRegionLocator() {
136      return regionLocator;
137    }
138  }
139
140  protected static final byte[] tableSeparator = Bytes.toBytes(";");
141
142  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
143    return Bytes.add(tableName, tableSeparator, suffix);
144  }
145
146  // The following constants are private since these are used by
147  // HFileOutputFormat2 to internally transfer data between job setup and
148  // reducer run using conf.
149  // These should not be changed by the client.
150  static final String COMPRESSION_FAMILIES_CONF_KEY =
151    "hbase.hfileoutputformat.families.compression";
152  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
153  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
154  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
155  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
156    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
157
158  // When MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY is enabled, should table names be written
159  // with namespace included. Enabling this means downstream jobs which use this output will
160  // need to account for namespace when finding the directory of the job output.
161  // For example: a table named my-table in namespace default would be in `/output/default/my-table`
162  // instead of current `/output/my-table`
163  // This will be the behavior when upgrading to hbase 3.0.
164  public static final String TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY =
165    "hbase.hfileoutputformat.tablename.namespace.inclusive";
166
167  private static final boolean TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE = false;
168
169  // This constant is public since the client can modify this when setting
170  // up their conf object and thus refer to this symbol.
171  // It is present for backwards compatibility reasons. Use it only to
172  // override the auto-detection of datablock encoding and compression.
173  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
174    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
175  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
176    "hbase.mapreduce.hfileoutputformat.compression";
177
178  /**
179   * Keep locality while generating HFiles for bulkload. See HBASE-12596
180   */
181  public static final String LOCALITY_SENSITIVE_CONF_KEY =
182    "hbase.bulkload.locality.sensitive.enabled";
183  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
184  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
185  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
186    "hbase.mapreduce.use.multi.table.hfileoutputformat";
187
188  /**
189   * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
190   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
191   */
192  @InterfaceAudience.Private
193  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
194    "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
195  static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
196
197  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
198  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
199    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
200  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
201    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
202  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
203    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
204
205  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
206  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
207
208  @Override
209  public RecordWriter<ImmutableBytesWritable, Cell>
210    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
211    return createRecordWriter(context, this.getOutputCommitter(context));
212  }
213
214  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
215    return combineTableNameSuffix(tableName, family);
216  }
217
218  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
219    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
220
221    // Get the path of the temporary output file
222    final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
223    final Configuration conf = context.getConfiguration();
224    final boolean writeMultipleTables =
225      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
226    final boolean writeToTableWithNamespace = conf.getBoolean(
227      TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE);
228    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
229    if (writeTableNames == null || writeTableNames.isEmpty()) {
230      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
231    }
232    final FileSystem fs = outputDir.getFileSystem(conf);
233    // These configs. are from hbase-*.xml
234    final long maxsize =
235      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
236    // Invented config. Add to hbase-*.xml if other than default compression.
237    final String defaultCompressionStr =
238      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
239    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
240    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
241    final Algorithm overriddenCompression =
242      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
243    final boolean compactionExclude =
244      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
245    final Set<String> allTableNames = Arrays
246      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
247
248    // create a map from column family to the compression algorithm
249    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
250    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
251    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
252    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
253
254    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
255    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
256      createFamilyDataBlockEncodingMap(conf);
257    final DataBlockEncoding overriddenEncoding =
258      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
259
260    return new RecordWriter<ImmutableBytesWritable, V>() {
261      // Map of families to writers and how much has been output on the writer.
262      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
263      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
264      private final long now = EnvironmentEdgeManager.currentTime();
265
266      @Override
267      public void write(ImmutableBytesWritable row, V cell) throws IOException {
268        Cell kv = cell;
269        // null input == user explicitly wants to flush
270        if (row == null && kv == null) {
271          rollWriters(null);
272          return;
273        }
274
275        byte[] rowKey = CellUtil.cloneRow(kv);
276        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
277        byte[] family = CellUtil.cloneFamily(kv);
278        byte[] tableNameBytes = null;
279        if (writeMultipleTables) {
280          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
281          tableNameBytes = writeToTableWithNamespace
282            ? TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
283              .getBytes(Charset.defaultCharset())
284            : TableName.valueOf(tableNameBytes).toBytes();
285          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
286            throw new IllegalArgumentException(
287              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
288          }
289        } else {
290          tableNameBytes = Bytes.toBytes(writeTableNames);
291        }
292        Path tableRelPath = getTableRelativePath(tableNameBytes);
293        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
294
295        WriterLength wl = this.writers.get(tableAndFamily);
296
297        // If this is a new column family, verify that the directory exists
298        if (wl == null) {
299          Path writerPath = null;
300          if (writeMultipleTables) {
301            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
302          } else {
303            writerPath = new Path(outputDir, Bytes.toString(family));
304          }
305          fs.mkdirs(writerPath);
306          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
307        }
308
309        // This can only happen once a row is finished though
310        if (
311          wl != null && wl.written + length >= maxsize
312            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
313        ) {
314          rollWriters(wl);
315        }
316
317        // create a new WAL writer, if necessary
318        if (wl == null || wl.writer == null) {
319          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
320            HRegionLocation loc = null;
321            String tableName = Bytes.toString(tableNameBytes);
322            if (tableName != null) {
323              try (
324                Connection connection =
325                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
326                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
327                loc = locator.getRegionLocation(rowKey);
328              } catch (Throwable e) {
329                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
330                  tableName, e);
331                loc = null;
332              }
333            }
334
335            if (null == loc) {
336              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
337              wl = getNewWriter(tableNameBytes, family, conf, null);
338            } else {
339              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
340              InetSocketAddress initialIsa =
341                new InetSocketAddress(loc.getHostname(), loc.getPort());
342              if (initialIsa.isUnresolved()) {
343                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
344                wl = getNewWriter(tableNameBytes, family, conf, null);
345              } else {
346                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
347                wl = getNewWriter(tableNameBytes, family, conf,
348                  new InetSocketAddress[] { initialIsa });
349              }
350            }
351          } else {
352            wl = getNewWriter(tableNameBytes, family, conf, null);
353          }
354        }
355
356        // we now have the proper WAL writer. full steam ahead
357        PrivateCellUtil.updateLatestStamp(cell, this.now);
358        wl.writer.append(kv);
359        wl.written += length;
360
361        // Copy the row so we know when a row transition.
362        this.previousRows.put(family, rowKey);
363      }
364
365      private Path getTableRelativePath(byte[] tableNameBytes) {
366        String tableName = Bytes.toString(tableNameBytes);
367        String[] tableNameParts = tableName.split(":");
368        Path tableRelPath = new Path(tableName.split(":")[0]);
369        if (tableNameParts.length > 1) {
370          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
371        }
372        return tableRelPath;
373      }
374
375      private void rollWriters(WriterLength writerLength) throws IOException {
376        if (writerLength != null) {
377          closeWriter(writerLength);
378        } else {
379          for (WriterLength wl : this.writers.values()) {
380            closeWriter(wl);
381          }
382        }
383      }
384
385      private void closeWriter(WriterLength wl) throws IOException {
386        if (wl.writer != null) {
387          LOG.info(
388            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
389          close(wl.writer);
390          wl.writer = null;
391        }
392        wl.written = 0;
393      }
394
395      private Configuration createRemoteClusterConf(Configuration conf) {
396        final Configuration newConf = new Configuration(conf);
397
398        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
399        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
400        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
401
402        if (quorum != null && clientPort != null && parent != null) {
403          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
404          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
405          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
406        }
407
408        for (Entry<String, String> entry : conf) {
409          String key = entry.getKey();
410          if (
411            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
412              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
413              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
414          ) {
415            // Handled them above
416            continue;
417          }
418
419          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
420            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
421            if (!originalKey.isEmpty()) {
422              newConf.set(originalKey, entry.getValue());
423            }
424          }
425        }
426
427        return newConf;
428      }
429
430      /*
431       * Create a new StoreFile.Writer.
432       * @return A WriterLength, containing a new StoreFile.Writer.
433       */
434      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
435          justification = "Not important")
436      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
437        InetSocketAddress[] favoredNodes) throws IOException {
438        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
439        Path familydir = new Path(outputDir, Bytes.toString(family));
440        if (writeMultipleTables) {
441          familydir =
442            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
443        }
444        WriterLength wl = new WriterLength();
445        Algorithm compression = overriddenCompression;
446        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
447        compression = compression == null ? defaultCompression : compression;
448        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
449        bloomType = bloomType == null ? BloomType.NONE : bloomType;
450        String bloomParam = bloomParamMap.get(tableAndFamily);
451        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
452          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
453        }
454        Integer blockSize = blockSizeMap.get(tableAndFamily);
455        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
456        DataBlockEncoding encoding = overriddenEncoding;
457        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
458        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
459        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
460          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
461          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
462          .withColumnFamily(family).withTableName(tableName)
463          .withCreateTime(EnvironmentEdgeManager.currentTime());
464
465        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
466          contextBuilder.withIncludesTags(true);
467        }
468
469        HFileContext hFileContext = contextBuilder.build();
470        if (null == favoredNodes) {
471          wl.writer =
472            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
473              .withBloomType(bloomType).withFileContext(hFileContext).build();
474        } else {
475          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
476            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
477            .withFavoredNodes(favoredNodes).build();
478        }
479
480        this.writers.put(tableAndFamily, wl);
481        return wl;
482      }
483
484      private void close(final StoreFileWriter w) throws IOException {
485        if (w != null) {
486          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
487          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
488          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
489          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
490          w.appendTrackedTimestampsToMetadata();
491          w.close();
492        }
493      }
494
495      @Override
496      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
497        for (WriterLength wl : this.writers.values()) {
498          close(wl.writer);
499        }
500      }
501    };
502  }
503
504  /**
505   * Configure block storage policy for CF after the directory is created.
506   */
507  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
508    byte[] tableAndFamily, Path cfPath) {
509    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
510      return;
511    }
512
513    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
514      conf.get(STORAGE_POLICY_PROPERTY));
515    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
516  }
517
518  /*
519   * Data structure to hold a Writer and amount of data written on it.
520   */
521  static class WriterLength {
522    long written = 0;
523    StoreFileWriter writer = null;
524  }
525
526  /**
527   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
528   */
529  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
530    boolean writeMultipleTables) throws IOException {
531
532    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
533    for (RegionLocator regionLocator : regionLocators) {
534      TableName tableName = regionLocator.getName();
535      LOG.info("Looking up current regions for table " + tableName);
536      byte[][] byteKeys = regionLocator.getStartKeys();
537      for (byte[] byteKey : byteKeys) {
538        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
539        if (writeMultipleTables) {
540          // MultiTableHFileOutputFormat use case
541          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
542        }
543        if (LOG.isDebugEnabled()) {
544          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
545        }
546        ret.add(new ImmutableBytesWritable(fullKey));
547      }
548    }
549    return ret;
550  }
551
552  /**
553   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
554   * contains the split points in startKeys.
555   */
556  @SuppressWarnings("deprecation")
557  private static void writePartitions(Configuration conf, Path partitionsPath,
558    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
559    LOG.info("Writing partition information to " + partitionsPath);
560    if (startKeys.isEmpty()) {
561      throw new IllegalArgumentException("No regions passed");
562    }
563
564    // We're generating a list of split points, and we don't ever
565    // have keys < the first region (which has an empty start key)
566    // so we need to remove it. Otherwise we would end up with an
567    // empty reducer with index 0
568    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
569    ImmutableBytesWritable first = sorted.first();
570    if (writeMultipleTables) {
571      first =
572        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
573    }
574    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
575      throw new IllegalArgumentException(
576        "First region of table should have empty start key. Instead has: "
577          + Bytes.toStringBinary(first.get()));
578    }
579    sorted.remove(sorted.first());
580
581    // Write the actual file
582    FileSystem fs = partitionsPath.getFileSystem(conf);
583    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
584      ImmutableBytesWritable.class, NullWritable.class);
585
586    try {
587      for (ImmutableBytesWritable startKey : sorted) {
588        writer.append(startKey, NullWritable.get());
589      }
590    } finally {
591      writer.close();
592    }
593  }
594
595  /**
596   * Configure a MapReduce Job to perform an incremental load into the given table. This
597   * <ul>
598   * <li>Inspects the table to configure a total order partitioner</li>
599   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
600   * <li>Sets the number of reduce tasks to match the current number of regions</li>
601   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
602   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
603   * PutSortReducer)</li>
604   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
605   * </ul>
606   * The user should be sure to set the map output value class to either KeyValue or Put before
607   * running this function.
608   */
609  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
610    throws IOException {
611    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
612    configureRemoteCluster(job, table.getConfiguration());
613  }
614
615  /**
616   * Configure a MapReduce Job to perform an incremental load into the given table. This
617   * <ul>
618   * <li>Inspects the table to configure a total order partitioner</li>
619   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
620   * <li>Sets the number of reduce tasks to match the current number of regions</li>
621   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
622   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
623   * PutSortReducer)</li>
624   * </ul>
625   * The user should be sure to set the map output value class to either KeyValue or Put before
626   * running this function.
627   */
628  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
629    RegionLocator regionLocator) throws IOException {
630    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
631    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
632    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
633  }
634
635  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
636    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
637    Configuration conf = job.getConfiguration();
638    job.setOutputKeyClass(ImmutableBytesWritable.class);
639    job.setOutputValueClass(MapReduceExtendedCell.class);
640    job.setOutputFormatClass(cls);
641
642    final boolean writeToTableWithNamespace = conf.getBoolean(
643      TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE);
644
645    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
646      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
647    }
648    boolean writeMultipleTables = false;
649    if (MultiTableHFileOutputFormat.class.equals(cls)) {
650      writeMultipleTables = true;
651      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
652    }
653    // Based on the configured map output class, set the correct reducer to properly
654    // sort the incoming values.
655    // TODO it would be nice to pick one or the other of these formats.
656    if (
657      KeyValue.class.equals(job.getMapOutputValueClass())
658        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
659    ) {
660      job.setReducerClass(CellSortReducer.class);
661    } else if (Put.class.equals(job.getMapOutputValueClass())) {
662      job.setReducerClass(PutSortReducer.class);
663    } else if (Text.class.equals(job.getMapOutputValueClass())) {
664      job.setReducerClass(TextSortReducer.class);
665    } else {
666      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
667    }
668
669    mergeSerializations(conf);
670
671    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
672      LOG.info("bulkload locality sensitive enabled");
673    }
674
675    /* Now get the region start keys for every table required */
676    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
677    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
678    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
679
680    for (TableInfo tableInfo : multiTableInfo) {
681      regionLocators.add(tableInfo.getRegionLocator());
682      allTableNames.add(writeMultipleTables && writeToTableWithNamespace
683        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
684        : tableInfo.getRegionLocator().getName().getNameAsString());
685      tableDescriptors.add(tableInfo.getTableDescriptor());
686    }
687    // Record tablenames for creating writer by favored nodes, and decoding compression,
688    // block size and other attributes of columnfamily per table
689    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
690      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
691    List<ImmutableBytesWritable> startKeys =
692      getRegionStartKeys(regionLocators, writeMultipleTables);
693    // Use table's region boundaries for TOP split points.
694    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
695      + "to match current region count for all tables");
696    job.setNumReduceTasks(startKeys.size());
697
698    configurePartitioner(job, startKeys, writeMultipleTables);
699    // Set compression algorithms based on column families
700
701    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
702      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
703    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
704      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
705    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
706      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
707    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
708      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
709    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
710      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
711
712    TableMapReduceUtil.addDependencyJars(job);
713    TableMapReduceUtil.initCredentials(job);
714    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
715  }
716
717  private static void mergeSerializations(Configuration conf) {
718    List<String> serializations = new ArrayList<>();
719
720    // add any existing values that have been set
721    String[] existing = conf.getStrings("io.serializations");
722    if (existing != null) {
723      Collections.addAll(serializations, existing);
724    }
725
726    serializations.add(MutationSerialization.class.getName());
727    serializations.add(ResultSerialization.class.getName());
728
729    // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
730    // SerializationFactory runs through serializations in the order they are registered.
731    // We want to register ExtendedCellSerialization before CellSerialization because both
732    // work for ExtendedCells but only ExtendedCellSerialization handles them properly.
733    if (
734      conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
735        EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
736    ) {
737      serializations.add(ExtendedCellSerialization.class.getName());
738    }
739    serializations.add(CellSerialization.class.getName());
740
741    conf.setStrings("io.serializations", serializations.toArray(new String[0]));
742  }
743
744  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
745    throws IOException {
746    Configuration conf = job.getConfiguration();
747
748    job.setOutputKeyClass(ImmutableBytesWritable.class);
749    job.setOutputValueClass(MapReduceExtendedCell.class);
750    job.setOutputFormatClass(HFileOutputFormat2.class);
751
752    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
753    singleTableDescriptor.add(tableDescriptor);
754
755    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
756    // Set compression algorithms based on column families
757    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
758      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
759    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
760      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
761    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
762      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
763    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
764      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
765    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
766      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
767
768    TableMapReduceUtil.addDependencyJars(job);
769    TableMapReduceUtil.initCredentials(job);
770    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
771  }
772
773  /**
774   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
775   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
776   * HBase cluster to be used to load region location is configured in the job configuration. Call
777   * this method when another HBase cluster key is configured in the job configuration. For example,
778   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
779   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
780   * A and locality-sensitive won't working correctly.
781   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
782   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
783   * @param job         which has configuration to be updated
784   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
785   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
786   * @see #LOCALITY_SENSITIVE_CONF_KEY
787   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
788   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
789   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
790   */
791  public static void configureRemoteCluster(Job job, Configuration clusterConf) {
792    Configuration conf = job.getConfiguration();
793
794    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
795      return;
796    }
797
798    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
799    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
800      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
801    final String parent =
802      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
803
804    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
805    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
806    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
807
808    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
809      + "/" + parent);
810  }
811
812  /**
813   * Runs inside the task to deserialize column family to compression algorithm map from the
814   * configuration.
815   * @param conf to read the serialized values from
816   * @return a map from column family to the configured compression algorithm
817   */
818  @InterfaceAudience.Private
819  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
820    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
821    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
822    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
823      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
824      compressionMap.put(e.getKey(), algorithm);
825    }
826    return compressionMap;
827  }
828
829  /**
830   * Runs inside the task to deserialize column family to bloom filter type map from the
831   * configuration.
832   * @param conf to read the serialized values from
833   * @return a map from column family to the the configured bloom filter type
834   */
835  @InterfaceAudience.Private
836  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
837    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
838    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
839    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
840      BloomType bloomType = BloomType.valueOf(e.getValue());
841      bloomTypeMap.put(e.getKey(), bloomType);
842    }
843    return bloomTypeMap;
844  }
845
846  /**
847   * Runs inside the task to deserialize column family to bloom filter param map from the
848   * configuration.
849   * @param conf to read the serialized values from
850   * @return a map from column family to the the configured bloom filter param
851   */
852  @InterfaceAudience.Private
853  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
854    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
855  }
856
857  /**
858   * Runs inside the task to deserialize column family to block size map from the configuration.
859   * @param conf to read the serialized values from
860   * @return a map from column family to the configured block size
861   */
862  @InterfaceAudience.Private
863  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
864    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
865    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
866    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
867      Integer blockSize = Integer.parseInt(e.getValue());
868      blockSizeMap.put(e.getKey(), blockSize);
869    }
870    return blockSizeMap;
871  }
872
873  /**
874   * Runs inside the task to deserialize column family to data block encoding type map from the
875   * configuration.
876   * @param conf to read the serialized values from
877   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
878   *         for the family
879   */
880  @InterfaceAudience.Private
881  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
882    Map<byte[], String> stringMap =
883      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
884    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
885    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
886      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
887    }
888    return encoderMap;
889  }
890
891  /**
892   * Run inside the task to deserialize column family to given conf value map.
893   * @param conf     to read the serialized values from
894   * @param confName conf key to read from the configuration
895   * @return a map of column family to the given configuration value
896   */
897  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
898    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
899    String confVal = conf.get(confName, "");
900    for (String familyConf : confVal.split("&")) {
901      String[] familySplit = familyConf.split("=");
902      if (familySplit.length != 2) {
903        continue;
904      }
905      try {
906        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
907          URLDecoder.decode(familySplit[1], "UTF-8"));
908      } catch (UnsupportedEncodingException e) {
909        // will not happen with UTF-8 encoding
910        throw new AssertionError(e);
911      }
912    }
913    return confValMap;
914  }
915
916  /**
917   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
918   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
919   */
920  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
921    boolean writeMultipleTables) throws IOException {
922    Configuration conf = job.getConfiguration();
923    // create the partitions file
924    FileSystem fs = FileSystem.get(conf);
925    String hbaseTmpFsDir =
926      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
927    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
928    fs.makeQualified(partitionsPath);
929    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
930    fs.deleteOnExit(partitionsPath);
931
932    // configure job to use it
933    job.setPartitionerClass(TotalOrderPartitioner.class);
934    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
935  }
936
937  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
938      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
939  @InterfaceAudience.Private
940  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
941    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
942    StringBuilder attributeValue = new StringBuilder();
943    int i = 0;
944    for (TableDescriptor tableDescriptor : allTables) {
945      if (tableDescriptor == null) {
946        // could happen with mock table instance
947        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
948        return "";
949      }
950      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
951        if (i++ > 0) {
952          attributeValue.append('&');
953        }
954        attributeValue.append(URLEncoder
955          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
956            familyDescriptor.getName())), "UTF-8"));
957        attributeValue.append('=');
958        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
959      }
960    }
961    // Get rid of the last ampersand
962    return attributeValue.toString();
963  }
964
965  /**
966   * Serialize column family to compression algorithm map to configuration. Invoked while
967   * configuring the MR job for incremental load.
968   */
969  @InterfaceAudience.Private
970  static Function<ColumnFamilyDescriptor, String> compressionDetails =
971    familyDescriptor -> familyDescriptor.getCompressionType().getName();
972
973  /**
974   * Serialize column family to block size map to configuration. Invoked while configuring the MR
975   * job for incremental load.
976   */
977  @InterfaceAudience.Private
978  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
979    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
980
981  /**
982   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
983   * job for incremental load.
984   */
985  @InterfaceAudience.Private
986  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
987    String bloomType = familyDescriptor.getBloomFilterType().toString();
988    if (bloomType == null) {
989      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
990    }
991    return bloomType;
992  };
993
994  /**
995   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
996   * job for incremental load.
997   */
998  @InterfaceAudience.Private
999  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
1000    BloomType bloomType = familyDescriptor.getBloomFilterType();
1001    String bloomParam = "";
1002    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
1003      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
1004    }
1005    return bloomParam;
1006  };
1007
1008  /**
1009   * Serialize column family to data block encoding map to configuration. Invoked while configuring
1010   * the MR job for incremental load.
1011   */
1012  @InterfaceAudience.Private
1013  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
1014    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
1015    if (encoding == null) {
1016      encoding = DataBlockEncoding.NONE;
1017    }
1018    return encoding.toString();
1019  };
1020
1021}