001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; 024 025import java.io.IOException; 026import java.io.UnsupportedEncodingException; 027import java.net.InetSocketAddress; 028import java.net.URLDecoder; 029import java.net.URLEncoder; 030import java.nio.charset.Charset; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.function.Function; 042import java.util.stream.Collectors; 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.HRegionLocation; 051import org.apache.hadoop.hbase.HTableDescriptor; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.PrivateCellUtil; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RegionLocator; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.client.TableDescriptor; 063import org.apache.hadoop.hbase.fs.HFileSystem; 064import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 068import org.apache.hadoop.hbase.io.hfile.CacheConfig; 069import org.apache.hadoop.hbase.io.hfile.HFile; 070import org.apache.hadoop.hbase.io.hfile.HFileContext; 071import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 072import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; 073import org.apache.hadoop.hbase.regionserver.BloomType; 074import org.apache.hadoop.hbase.regionserver.HStore; 075import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 076import org.apache.hadoop.hbase.regionserver.StoreUtils; 077import org.apache.hadoop.hbase.util.BloomFilterUtil; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CommonFSUtils; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 082import org.apache.hadoop.io.NullWritable; 083import org.apache.hadoop.io.SequenceFile; 084import org.apache.hadoop.io.Text; 085import org.apache.hadoop.mapreduce.Job; 086import org.apache.hadoop.mapreduce.OutputCommitter; 087import org.apache.hadoop.mapreduce.OutputFormat; 088import org.apache.hadoop.mapreduce.RecordWriter; 089import org.apache.hadoop.mapreduce.TaskAttemptContext; 090import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 091import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 092import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 093import org.apache.yetus.audience.InterfaceAudience; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the 099 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will 100 * forcibly roll all HFiles being written. 101 * <p> 102 * Using this class as part of a MapReduce job is best done using 103 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. 104 */ 105@InterfaceAudience.Public 106public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> { 107 private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class); 108 109 static class TableInfo { 110 private TableDescriptor tableDesctiptor; 111 private RegionLocator regionLocator; 112 113 public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { 114 this.tableDesctiptor = tableDesctiptor; 115 this.regionLocator = regionLocator; 116 } 117 118 /** 119 * The modification for the returned HTD doesn't affect the inner TD. 120 * @return A clone of inner table descriptor 121 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} 122 * instead. 123 * @see #getTableDescriptor() 124 * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a> 125 */ 126 @Deprecated 127 public HTableDescriptor getHTableDescriptor() { 128 return new HTableDescriptor(tableDesctiptor); 129 } 130 131 public TableDescriptor getTableDescriptor() { 132 return tableDesctiptor; 133 } 134 135 public RegionLocator getRegionLocator() { 136 return regionLocator; 137 } 138 } 139 140 protected static final byte[] tableSeparator = Bytes.toBytes(";"); 141 142 protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { 143 return Bytes.add(tableName, tableSeparator, suffix); 144 } 145 146 // The following constants are private since these are used by 147 // HFileOutputFormat2 to internally transfer data between job setup and 148 // reducer run using conf. 149 // These should not be changed by the client. 150 static final String COMPRESSION_FAMILIES_CONF_KEY = 151 "hbase.hfileoutputformat.families.compression"; 152 static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; 153 static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; 154 static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; 155 static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = 156 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; 157 158 // When MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY is enabled, should table names be written 159 // with namespace included. Enabling this means downstream jobs which use this output will 160 // need to account for namespace when finding the directory of the job output. 161 // For example: a table named my-table in namespace default would be in `/output/default/my-table` 162 // instead of current `/output/my-table` 163 // This will be the behavior when upgrading to hbase 3.0. 164 public static final String TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY = 165 "hbase.hfileoutputformat.tablename.namespace.inclusive"; 166 167 private static final boolean TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE = false; 168 169 // This constant is public since the client can modify this when setting 170 // up their conf object and thus refer to this symbol. 171 // It is present for backwards compatibility reasons. Use it only to 172 // override the auto-detection of datablock encoding and compression. 173 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = 174 "hbase.mapreduce.hfileoutputformat.datablock.encoding"; 175 public static final String COMPRESSION_OVERRIDE_CONF_KEY = 176 "hbase.mapreduce.hfileoutputformat.compression"; 177 178 /** 179 * Keep locality while generating HFiles for bulkload. See HBASE-12596 180 */ 181 public static final String LOCALITY_SENSITIVE_CONF_KEY = 182 "hbase.bulkload.locality.sensitive.enabled"; 183 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 184 static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; 185 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = 186 "hbase.mapreduce.use.multi.table.hfileoutputformat"; 187 188 /** 189 * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config 190 * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell. 191 */ 192 @InterfaceAudience.Private 193 public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = 194 "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; 195 static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; 196 197 public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; 198 public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = 199 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; 200 public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY = 201 REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR; 202 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY = 203 REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT; 204 205 public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; 206 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; 207 208 @Override 209 public RecordWriter<ImmutableBytesWritable, Cell> 210 getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { 211 return createRecordWriter(context, this.getOutputCommitter(context)); 212 } 213 214 protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { 215 return combineTableNameSuffix(tableName, family); 216 } 217 218 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( 219 final TaskAttemptContext context, final OutputCommitter committer) throws IOException { 220 221 // Get the path of the temporary output file 222 final Path outputDir = ((FileOutputCommitter) committer).getWorkPath(); 223 final Configuration conf = context.getConfiguration(); 224 final boolean writeMultipleTables = 225 conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 226 final boolean writeToTableWithNamespace = conf.getBoolean( 227 TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE); 228 final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); 229 if (writeTableNames == null || writeTableNames.isEmpty()) { 230 throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty"); 231 } 232 final FileSystem fs = outputDir.getFileSystem(conf); 233 // These configs. are from hbase-*.xml 234 final long maxsize = 235 conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); 236 // Invented config. Add to hbase-*.xml if other than default compression. 237 final String defaultCompressionStr = 238 conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); 239 final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr); 240 String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY); 241 final Algorithm overriddenCompression = 242 compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null; 243 final boolean compactionExclude = 244 conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false); 245 final Set<String> allTableNames = Arrays 246 .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet()); 247 248 // create a map from column family to the compression algorithm 249 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); 250 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); 251 final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf); 252 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); 253 254 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); 255 final Map<byte[], DataBlockEncoding> datablockEncodingMap = 256 createFamilyDataBlockEncodingMap(conf); 257 final DataBlockEncoding overriddenEncoding = 258 dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null; 259 260 return new RecordWriter<ImmutableBytesWritable, V>() { 261 // Map of families to writers and how much has been output on the writer. 262 private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 263 private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); 264 private final long now = EnvironmentEdgeManager.currentTime(); 265 266 @Override 267 public void write(ImmutableBytesWritable row, V cell) throws IOException { 268 Cell kv = cell; 269 // null input == user explicitly wants to flush 270 if (row == null && kv == null) { 271 rollWriters(null); 272 return; 273 } 274 275 byte[] rowKey = CellUtil.cloneRow(kv); 276 int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; 277 byte[] family = CellUtil.cloneFamily(kv); 278 byte[] tableNameBytes = null; 279 if (writeMultipleTables) { 280 tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); 281 tableNameBytes = writeToTableWithNamespace 282 ? TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() 283 .getBytes(Charset.defaultCharset()) 284 : TableName.valueOf(tableNameBytes).toBytes(); 285 if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { 286 throw new IllegalArgumentException( 287 "TableName " + Bytes.toString(tableNameBytes) + " not expected"); 288 } 289 } else { 290 tableNameBytes = Bytes.toBytes(writeTableNames); 291 } 292 Path tableRelPath = getTableRelativePath(tableNameBytes); 293 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); 294 295 WriterLength wl = this.writers.get(tableAndFamily); 296 297 // If this is a new column family, verify that the directory exists 298 if (wl == null) { 299 Path writerPath = null; 300 if (writeMultipleTables) { 301 writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); 302 } else { 303 writerPath = new Path(outputDir, Bytes.toString(family)); 304 } 305 fs.mkdirs(writerPath); 306 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); 307 } 308 309 // This can only happen once a row is finished though 310 if ( 311 wl != null && wl.written + length >= maxsize 312 && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0 313 ) { 314 rollWriters(wl); 315 } 316 317 // create a new WAL writer, if necessary 318 if (wl == null || wl.writer == null) { 319 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 320 HRegionLocation loc = null; 321 String tableName = Bytes.toString(tableNameBytes); 322 if (tableName != null) { 323 try ( 324 Connection connection = 325 ConnectionFactory.createConnection(createRemoteClusterConf(conf)); 326 RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { 327 loc = locator.getRegionLocation(rowKey); 328 } catch (Throwable e) { 329 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), 330 tableName, e); 331 loc = null; 332 } 333 } 334 335 if (null == loc) { 336 LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); 337 wl = getNewWriter(tableNameBytes, family, conf, null); 338 } else { 339 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 340 InetSocketAddress initialIsa = 341 new InetSocketAddress(loc.getHostname(), loc.getPort()); 342 if (initialIsa.isUnresolved()) { 343 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); 344 wl = getNewWriter(tableNameBytes, family, conf, null); 345 } else { 346 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 347 wl = getNewWriter(tableNameBytes, family, conf, 348 new InetSocketAddress[] { initialIsa }); 349 } 350 } 351 } else { 352 wl = getNewWriter(tableNameBytes, family, conf, null); 353 } 354 } 355 356 // we now have the proper WAL writer. full steam ahead 357 PrivateCellUtil.updateLatestStamp(cell, this.now); 358 wl.writer.append(kv); 359 wl.written += length; 360 361 // Copy the row so we know when a row transition. 362 this.previousRows.put(family, rowKey); 363 } 364 365 private Path getTableRelativePath(byte[] tableNameBytes) { 366 String tableName = Bytes.toString(tableNameBytes); 367 String[] tableNameParts = tableName.split(":"); 368 Path tableRelPath = new Path(tableName.split(":")[0]); 369 if (tableNameParts.length > 1) { 370 tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); 371 } 372 return tableRelPath; 373 } 374 375 private void rollWriters(WriterLength writerLength) throws IOException { 376 if (writerLength != null) { 377 closeWriter(writerLength); 378 } else { 379 for (WriterLength wl : this.writers.values()) { 380 closeWriter(wl); 381 } 382 } 383 } 384 385 private void closeWriter(WriterLength wl) throws IOException { 386 if (wl.writer != null) { 387 LOG.info( 388 "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); 389 close(wl.writer); 390 wl.writer = null; 391 } 392 wl.written = 0; 393 } 394 395 private Configuration createRemoteClusterConf(Configuration conf) { 396 final Configuration newConf = new Configuration(conf); 397 398 final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY); 399 final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY); 400 final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY); 401 402 if (quorum != null && clientPort != null && parent != null) { 403 newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum); 404 newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort)); 405 newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent); 406 } 407 408 for (Entry<String, String> entry : conf) { 409 String key = entry.getKey(); 410 if ( 411 REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key) 412 || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key) 413 || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key) 414 ) { 415 // Handled them above 416 continue; 417 } 418 419 if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) { 420 String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length()); 421 if (!originalKey.isEmpty()) { 422 newConf.set(originalKey, entry.getValue()); 423 } 424 } 425 } 426 427 return newConf; 428 } 429 430 /* 431 * Create a new StoreFile.Writer. 432 * @return A WriterLength, containing a new StoreFile.Writer. 433 */ 434 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", 435 justification = "Not important") 436 private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf, 437 InetSocketAddress[] favoredNodes) throws IOException { 438 byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); 439 Path familydir = new Path(outputDir, Bytes.toString(family)); 440 if (writeMultipleTables) { 441 familydir = 442 new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family))); 443 } 444 WriterLength wl = new WriterLength(); 445 Algorithm compression = overriddenCompression; 446 compression = compression == null ? compressionMap.get(tableAndFamily) : compression; 447 compression = compression == null ? defaultCompression : compression; 448 BloomType bloomType = bloomTypeMap.get(tableAndFamily); 449 bloomType = bloomType == null ? BloomType.NONE : bloomType; 450 String bloomParam = bloomParamMap.get(tableAndFamily); 451 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 452 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); 453 } 454 Integer blockSize = blockSizeMap.get(tableAndFamily); 455 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; 456 DataBlockEncoding encoding = overriddenEncoding; 457 encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; 458 encoding = encoding == null ? DataBlockEncoding.NONE : encoding; 459 HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) 460 .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf)) 461 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize) 462 .withColumnFamily(family).withTableName(tableName) 463 .withCreateTime(EnvironmentEdgeManager.currentTime()); 464 465 if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 466 contextBuilder.withIncludesTags(true); 467 } 468 469 HFileContext hFileContext = contextBuilder.build(); 470 if (null == favoredNodes) { 471 wl.writer = 472 new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir) 473 .withBloomType(bloomType).withFileContext(hFileContext).build(); 474 } else { 475 wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs)) 476 .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext) 477 .withFavoredNodes(favoredNodes).build(); 478 } 479 480 this.writers.put(tableAndFamily, wl); 481 return wl; 482 } 483 484 private void close(final StoreFileWriter w) throws IOException { 485 if (w != null) { 486 w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 487 w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); 488 w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); 489 w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); 490 w.appendTrackedTimestampsToMetadata(); 491 w.close(); 492 } 493 } 494 495 @Override 496 public void close(TaskAttemptContext c) throws IOException, InterruptedException { 497 for (WriterLength wl : this.writers.values()) { 498 close(wl.writer); 499 } 500 } 501 }; 502 } 503 504 /** 505 * Configure block storage policy for CF after the directory is created. 506 */ 507 static void configureStoragePolicy(final Configuration conf, final FileSystem fs, 508 byte[] tableAndFamily, Path cfPath) { 509 if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { 510 return; 511 } 512 513 String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), 514 conf.get(STORAGE_POLICY_PROPERTY)); 515 CommonFSUtils.setStoragePolicy(fs, cfPath, policy); 516 } 517 518 /* 519 * Data structure to hold a Writer and amount of data written on it. 520 */ 521 static class WriterLength { 522 long written = 0; 523 StoreFileWriter writer = null; 524 } 525 526 /** 527 * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable. 528 */ 529 private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, 530 boolean writeMultipleTables) throws IOException { 531 532 ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); 533 for (RegionLocator regionLocator : regionLocators) { 534 TableName tableName = regionLocator.getName(); 535 LOG.info("Looking up current regions for table " + tableName); 536 byte[][] byteKeys = regionLocator.getStartKeys(); 537 for (byte[] byteKey : byteKeys) { 538 byte[] fullKey = byteKey; // HFileOutputFormat2 use case 539 if (writeMultipleTables) { 540 // MultiTableHFileOutputFormat use case 541 fullKey = combineTableNameSuffix(tableName.getName(), byteKey); 542 } 543 if (LOG.isDebugEnabled()) { 544 LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey)); 545 } 546 ret.add(new ImmutableBytesWritable(fullKey)); 547 } 548 } 549 return ret; 550 } 551 552 /** 553 * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that 554 * contains the split points in startKeys. 555 */ 556 @SuppressWarnings("deprecation") 557 private static void writePartitions(Configuration conf, Path partitionsPath, 558 List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { 559 LOG.info("Writing partition information to " + partitionsPath); 560 if (startKeys.isEmpty()) { 561 throw new IllegalArgumentException("No regions passed"); 562 } 563 564 // We're generating a list of split points, and we don't ever 565 // have keys < the first region (which has an empty start key) 566 // so we need to remove it. Otherwise we would end up with an 567 // empty reducer with index 0 568 TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); 569 ImmutableBytesWritable first = sorted.first(); 570 if (writeMultipleTables) { 571 first = 572 new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get())); 573 } 574 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { 575 throw new IllegalArgumentException( 576 "First region of table should have empty start key. Instead has: " 577 + Bytes.toStringBinary(first.get())); 578 } 579 sorted.remove(sorted.first()); 580 581 // Write the actual file 582 FileSystem fs = partitionsPath.getFileSystem(conf); 583 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, 584 ImmutableBytesWritable.class, NullWritable.class); 585 586 try { 587 for (ImmutableBytesWritable startKey : sorted) { 588 writer.append(startKey, NullWritable.get()); 589 } 590 } finally { 591 writer.close(); 592 } 593 } 594 595 /** 596 * Configure a MapReduce Job to perform an incremental load into the given table. This 597 * <ul> 598 * <li>Inspects the table to configure a total order partitioner</li> 599 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 600 * <li>Sets the number of reduce tasks to match the current number of regions</li> 601 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 602 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 603 * PutSortReducer)</li> 604 * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li> 605 * </ul> 606 * The user should be sure to set the map output value class to either KeyValue or Put before 607 * running this function. 608 */ 609 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) 610 throws IOException { 611 configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 612 configureRemoteCluster(job, table.getConfiguration()); 613 } 614 615 /** 616 * Configure a MapReduce Job to perform an incremental load into the given table. This 617 * <ul> 618 * <li>Inspects the table to configure a total order partitioner</li> 619 * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> 620 * <li>Sets the number of reduce tasks to match the current number of regions</li> 621 * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> 622 * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or 623 * PutSortReducer)</li> 624 * </ul> 625 * The user should be sure to set the map output value class to either KeyValue or Put before 626 * running this function. 627 */ 628 public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, 629 RegionLocator regionLocator) throws IOException { 630 ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); 631 singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); 632 configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); 633 } 634 635 static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, 636 Class<? extends OutputFormat<?, ?>> cls) throws IOException { 637 Configuration conf = job.getConfiguration(); 638 job.setOutputKeyClass(ImmutableBytesWritable.class); 639 job.setOutputValueClass(MapReduceExtendedCell.class); 640 job.setOutputFormatClass(cls); 641 642 final boolean writeToTableWithNamespace = conf.getBoolean( 643 TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE); 644 645 if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { 646 throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); 647 } 648 boolean writeMultipleTables = false; 649 if (MultiTableHFileOutputFormat.class.equals(cls)) { 650 writeMultipleTables = true; 651 conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); 652 } 653 // Based on the configured map output class, set the correct reducer to properly 654 // sort the incoming values. 655 // TODO it would be nice to pick one or the other of these formats. 656 if ( 657 KeyValue.class.equals(job.getMapOutputValueClass()) 658 || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) 659 ) { 660 job.setReducerClass(CellSortReducer.class); 661 } else if (Put.class.equals(job.getMapOutputValueClass())) { 662 job.setReducerClass(PutSortReducer.class); 663 } else if (Text.class.equals(job.getMapOutputValueClass())) { 664 job.setReducerClass(TextSortReducer.class); 665 } else { 666 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); 667 } 668 669 mergeSerializations(conf); 670 671 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 672 LOG.info("bulkload locality sensitive enabled"); 673 } 674 675 /* Now get the region start keys for every table required */ 676 List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); 677 List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size()); 678 List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size()); 679 680 for (TableInfo tableInfo : multiTableInfo) { 681 regionLocators.add(tableInfo.getRegionLocator()); 682 allTableNames.add(writeMultipleTables && writeToTableWithNamespace 683 ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() 684 : tableInfo.getRegionLocator().getName().getNameAsString()); 685 tableDescriptors.add(tableInfo.getTableDescriptor()); 686 } 687 // Record tablenames for creating writer by favored nodes, and decoding compression, 688 // block size and other attributes of columnfamily per table 689 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 690 StringUtils.join(allTableNames, Bytes.toString(tableSeparator))); 691 List<ImmutableBytesWritable> startKeys = 692 getRegionStartKeys(regionLocators, writeMultipleTables); 693 // Use table's region boundaries for TOP split points. 694 LOG.info("Configuring " + startKeys.size() + " reduce partitions " 695 + "to match current region count for all tables"); 696 job.setNumReduceTasks(startKeys.size()); 697 698 configurePartitioner(job, startKeys, writeMultipleTables); 699 // Set compression algorithms based on column families 700 701 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 702 serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); 703 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 704 serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); 705 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 706 serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); 707 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 708 serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); 709 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 710 serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); 711 712 TableMapReduceUtil.addDependencyJars(job); 713 TableMapReduceUtil.initCredentials(job); 714 LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); 715 } 716 717 private static void mergeSerializations(Configuration conf) { 718 List<String> serializations = new ArrayList<>(); 719 720 // add any existing values that have been set 721 String[] existing = conf.getStrings("io.serializations"); 722 if (existing != null) { 723 Collections.addAll(serializations, existing); 724 } 725 726 serializations.add(MutationSerialization.class.getName()); 727 serializations.add(ResultSerialization.class.getName()); 728 729 // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's 730 // SerializationFactory runs through serializations in the order they are registered. 731 // We want to register ExtendedCellSerialization before CellSerialization because both 732 // work for ExtendedCells but only ExtendedCellSerialization handles them properly. 733 if ( 734 conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, 735 EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) 736 ) { 737 serializations.add(ExtendedCellSerialization.class.getName()); 738 } 739 serializations.add(CellSerialization.class.getName()); 740 741 conf.setStrings("io.serializations", serializations.toArray(new String[0])); 742 } 743 744 public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) 745 throws IOException { 746 Configuration conf = job.getConfiguration(); 747 748 job.setOutputKeyClass(ImmutableBytesWritable.class); 749 job.setOutputValueClass(MapReduceExtendedCell.class); 750 job.setOutputFormatClass(HFileOutputFormat2.class); 751 752 ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); 753 singleTableDescriptor.add(tableDescriptor); 754 755 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); 756 // Set compression algorithms based on column families 757 conf.set(COMPRESSION_FAMILIES_CONF_KEY, 758 serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); 759 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 760 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); 761 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 762 serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); 763 conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, 764 serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); 765 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 766 serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); 767 768 TableMapReduceUtil.addDependencyJars(job); 769 TableMapReduceUtil.initCredentials(job); 770 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); 771 } 772 773 /** 774 * Configure HBase cluster key for remote cluster to load region location for locality-sensitive 775 * if it's enabled. It's not necessary to call this method explicitly when the cluster key for 776 * HBase cluster to be used to load region location is configured in the job configuration. Call 777 * this method when another HBase cluster key is configured in the job configuration. For example, 778 * you should call when you load data from HBase cluster A using {@link TableInputFormat} and 779 * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster 780 * A and locality-sensitive won't working correctly. 781 * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using 782 * {@link Table#getConfiguration} as clusterConf. See HBASE-25608. 783 * @param job which has configuration to be updated 784 * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive 785 * @see #configureIncrementalLoad(Job, Table, RegionLocator) 786 * @see #LOCALITY_SENSITIVE_CONF_KEY 787 * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY 788 * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY 789 * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY 790 */ 791 public static void configureRemoteCluster(Job job, Configuration clusterConf) { 792 Configuration conf = job.getConfiguration(); 793 794 if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 795 return; 796 } 797 798 final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM); 799 final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 800 HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); 801 final String parent = 802 clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 803 804 conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum); 805 conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort); 806 conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent); 807 808 LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort 809 + "/" + parent); 810 } 811 812 /** 813 * Runs inside the task to deserialize column family to compression algorithm map from the 814 * configuration. 815 * @param conf to read the serialized values from 816 * @return a map from column family to the configured compression algorithm 817 */ 818 @InterfaceAudience.Private 819 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { 820 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); 821 Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 822 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 823 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); 824 compressionMap.put(e.getKey(), algorithm); 825 } 826 return compressionMap; 827 } 828 829 /** 830 * Runs inside the task to deserialize column family to bloom filter type map from the 831 * configuration. 832 * @param conf to read the serialized values from 833 * @return a map from column family to the the configured bloom filter type 834 */ 835 @InterfaceAudience.Private 836 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { 837 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); 838 Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 839 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 840 BloomType bloomType = BloomType.valueOf(e.getValue()); 841 bloomTypeMap.put(e.getKey(), bloomType); 842 } 843 return bloomTypeMap; 844 } 845 846 /** 847 * Runs inside the task to deserialize column family to bloom filter param map from the 848 * configuration. 849 * @param conf to read the serialized values from 850 * @return a map from column family to the the configured bloom filter param 851 */ 852 @InterfaceAudience.Private 853 static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) { 854 return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); 855 } 856 857 /** 858 * Runs inside the task to deserialize column family to block size map from the configuration. 859 * @param conf to read the serialized values from 860 * @return a map from column family to the configured block size 861 */ 862 @InterfaceAudience.Private 863 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { 864 Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); 865 Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 866 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 867 Integer blockSize = Integer.parseInt(e.getValue()); 868 blockSizeMap.put(e.getKey(), blockSize); 869 } 870 return blockSizeMap; 871 } 872 873 /** 874 * Runs inside the task to deserialize column family to data block encoding type map from the 875 * configuration. 876 * @param conf to read the serialized values from 877 * @return a map from column family to HFileDataBlockEncoder for the configured data block type 878 * for the family 879 */ 880 @InterfaceAudience.Private 881 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { 882 Map<byte[], String> stringMap = 883 createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); 884 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 885 for (Map.Entry<byte[], String> e : stringMap.entrySet()) { 886 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); 887 } 888 return encoderMap; 889 } 890 891 /** 892 * Run inside the task to deserialize column family to given conf value map. 893 * @param conf to read the serialized values from 894 * @param confName conf key to read from the configuration 895 * @return a map of column family to the given configuration value 896 */ 897 private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { 898 Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 899 String confVal = conf.get(confName, ""); 900 for (String familyConf : confVal.split("&")) { 901 String[] familySplit = familyConf.split("="); 902 if (familySplit.length != 2) { 903 continue; 904 } 905 try { 906 confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")), 907 URLDecoder.decode(familySplit[1], "UTF-8")); 908 } catch (UnsupportedEncodingException e) { 909 // will not happen with UTF-8 encoding 910 throw new AssertionError(e); 911 } 912 } 913 return confValMap; 914 } 915 916 /** 917 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against 918 * <code>splitPoints</code>. Cleans up the partitions file after job exists. 919 */ 920 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, 921 boolean writeMultipleTables) throws IOException { 922 Configuration conf = job.getConfiguration(); 923 // create the partitions file 924 FileSystem fs = FileSystem.get(conf); 925 String hbaseTmpFsDir = 926 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 927 Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); 928 fs.makeQualified(partitionsPath); 929 writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); 930 fs.deleteOnExit(partitionsPath); 931 932 // configure job to use it 933 job.setPartitionerClass(TotalOrderPartitioner.class); 934 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); 935 } 936 937 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 938 value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") 939 @InterfaceAudience.Private 940 static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, 941 List<TableDescriptor> allTables) throws UnsupportedEncodingException { 942 StringBuilder attributeValue = new StringBuilder(); 943 int i = 0; 944 for (TableDescriptor tableDescriptor : allTables) { 945 if (tableDescriptor == null) { 946 // could happen with mock table instance 947 // CODEREVIEW: Can I set an empty string in conf if mock table instance? 948 return ""; 949 } 950 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 951 if (i++ > 0) { 952 attributeValue.append('&'); 953 } 954 attributeValue.append(URLEncoder 955 .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), 956 familyDescriptor.getName())), "UTF-8")); 957 attributeValue.append('='); 958 attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); 959 } 960 } 961 // Get rid of the last ampersand 962 return attributeValue.toString(); 963 } 964 965 /** 966 * Serialize column family to compression algorithm map to configuration. Invoked while 967 * configuring the MR job for incremental load. 968 */ 969 @InterfaceAudience.Private 970 static Function<ColumnFamilyDescriptor, String> compressionDetails = 971 familyDescriptor -> familyDescriptor.getCompressionType().getName(); 972 973 /** 974 * Serialize column family to block size map to configuration. Invoked while configuring the MR 975 * job for incremental load. 976 */ 977 @InterfaceAudience.Private 978 static Function<ColumnFamilyDescriptor, String> blockSizeDetails = 979 familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize()); 980 981 /** 982 * Serialize column family to bloom type map to configuration. Invoked while configuring the MR 983 * job for incremental load. 984 */ 985 @InterfaceAudience.Private 986 static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { 987 String bloomType = familyDescriptor.getBloomFilterType().toString(); 988 if (bloomType == null) { 989 bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); 990 } 991 return bloomType; 992 }; 993 994 /** 995 * Serialize column family to bloom param map to configuration. Invoked while configuring the MR 996 * job for incremental load. 997 */ 998 @InterfaceAudience.Private 999 static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> { 1000 BloomType bloomType = familyDescriptor.getBloomFilterType(); 1001 String bloomParam = ""; 1002 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { 1003 bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); 1004 } 1005 return bloomParam; 1006 }; 1007 1008 /** 1009 * Serialize column family to data block encoding map to configuration. Invoked while configuring 1010 * the MR job for incremental load. 1011 */ 1012 @InterfaceAudience.Private 1013 static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { 1014 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); 1015 if (encoding == null) { 1016 encoding = DataBlockEncoding.NONE; 1017 } 1018 return encoding.toString(); 1019 }; 1020 1021}