001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.Mockito.verify; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.security.PrivilegedAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Random; 040import java.util.Set; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.CopyOnWriteArrayList; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.LocatedFileStatus; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.RemoteIterator; 055import org.apache.hadoop.hbase.ArrayBackedTag; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellUtil; 058import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 059import org.apache.hadoop.hbase.ExtendedCell; 060import org.apache.hadoop.hbase.HBaseClassTestRule; 061import org.apache.hadoop.hbase.HBaseConfiguration; 062import org.apache.hadoop.hbase.HBaseTestingUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.HDFSBlocksDistribution; 065import org.apache.hadoop.hbase.HadoopShims; 066import org.apache.hadoop.hbase.KeyValue; 067import org.apache.hadoop.hbase.PrivateCellUtil; 068import org.apache.hadoop.hbase.ServerName; 069import org.apache.hadoop.hbase.StartTestingClusterOption; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.Tag; 072import org.apache.hadoop.hbase.TagType; 073import org.apache.hadoop.hbase.client.Admin; 074import org.apache.hadoop.hbase.client.AsyncConnection; 075import org.apache.hadoop.hbase.client.BufferedMutator; 076import org.apache.hadoop.hbase.client.BufferedMutatorParams; 077import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 079import org.apache.hadoop.hbase.client.Connection; 080import org.apache.hadoop.hbase.client.ConnectionFactory; 081import org.apache.hadoop.hbase.client.ConnectionRegistry; 082import org.apache.hadoop.hbase.client.ConnectionUtils; 083import org.apache.hadoop.hbase.client.Hbck; 084import org.apache.hadoop.hbase.client.Put; 085import org.apache.hadoop.hbase.client.RegionLocator; 086import org.apache.hadoop.hbase.client.Result; 087import org.apache.hadoop.hbase.client.ResultScanner; 088import org.apache.hadoop.hbase.client.Scan; 089import org.apache.hadoop.hbase.client.Table; 090import org.apache.hadoop.hbase.client.TableBuilder; 091import org.apache.hadoop.hbase.client.TableDescriptor; 092import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 093import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 094import org.apache.hadoop.hbase.io.compress.Compression; 095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 097import org.apache.hadoop.hbase.io.hfile.CacheConfig; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 100import org.apache.hadoop.hbase.io.hfile.HFileScanner; 101import org.apache.hadoop.hbase.regionserver.BloomType; 102import org.apache.hadoop.hbase.regionserver.HRegion; 103import org.apache.hadoop.hbase.regionserver.HStore; 104import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 105import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 106import org.apache.hadoop.hbase.security.User; 107import org.apache.hadoop.hbase.testclassification.LargeTests; 108import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 109import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 110import org.apache.hadoop.hbase.util.Bytes; 111import org.apache.hadoop.hbase.util.CommonFSUtils; 112import org.apache.hadoop.hbase.util.FSUtils; 113import org.apache.hadoop.hbase.util.FutureUtils; 114import org.apache.hadoop.hbase.util.ReflectionUtils; 115import org.apache.hadoop.hdfs.DistributedFileSystem; 116import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 117import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 118import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 119import org.apache.hadoop.io.NullWritable; 120import org.apache.hadoop.mapreduce.Job; 121import org.apache.hadoop.mapreduce.Mapper; 122import org.apache.hadoop.mapreduce.RecordWriter; 123import org.apache.hadoop.mapreduce.TaskAttemptContext; 124import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 125import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 126import org.apache.hadoop.security.UserGroupInformation; 127import org.junit.Assert; 128import org.junit.ClassRule; 129import org.junit.Ignore; 130import org.junit.Test; 131import org.junit.experimental.categories.Category; 132import org.mockito.Mockito; 133import org.slf4j.Logger; 134import org.slf4j.LoggerFactory; 135 136/** 137 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 138 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 139 * values. 140 */ 141@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 142public class TestHFileOutputFormat2 { 143 144 @ClassRule 145 public static final HBaseClassTestRule CLASS_RULE = 146 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 147 148 private final static int ROWSPERSPLIT = 1024; 149 private static final int DEFAULT_VALUE_LENGTH = 1000; 150 151 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 152 private static final byte[][] FAMILIES = 153 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 154 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 155 .map(TableName::valueOf).toArray(TableName[]::new); 156 157 private HBaseTestingUtil util = new HBaseTestingUtil(); 158 159 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 160 161 /** 162 * Simple mapper that makes KeyValue output. 163 */ 164 static class RandomKVGeneratingMapper 165 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 166 167 private int keyLength; 168 private static final int KEYLEN_DEFAULT = 10; 169 private static final String KEYLEN_CONF = "randomkv.key.length"; 170 171 private int valLength; 172 private static final int VALLEN_DEFAULT = 10; 173 private static final String VALLEN_CONF = "randomkv.val.length"; 174 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 175 private boolean multiTableMapper = false; 176 private TableName[] tables = null; 177 178 @Override 179 protected void setup(Context context) throws IOException, InterruptedException { 180 super.setup(context); 181 182 Configuration conf = context.getConfiguration(); 183 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 184 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 185 multiTableMapper = 186 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 187 if (multiTableMapper) { 188 tables = TABLE_NAMES; 189 } else { 190 tables = new TableName[] { TABLE_NAMES[0] }; 191 } 192 } 193 194 @Override 195 protected void map(NullWritable n1, NullWritable n2, 196 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 197 throws java.io.IOException, InterruptedException { 198 199 byte keyBytes[] = new byte[keyLength]; 200 byte valBytes[] = new byte[valLength]; 201 202 int taskId = context.getTaskAttemptID().getTaskID().getId(); 203 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 204 byte[] key; 205 for (int j = 0; j < tables.length; ++j) { 206 for (int i = 0; i < ROWSPERSPLIT; i++) { 207 Bytes.random(keyBytes); 208 // Ensure that unique tasks generate unique keys 209 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 210 Bytes.random(valBytes); 211 key = keyBytes; 212 if (multiTableMapper) { 213 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 214 } 215 216 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 217 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 218 context.write(new ImmutableBytesWritable(key), kv); 219 } 220 } 221 } 222 } 223 } 224 225 /** 226 * Simple mapper that makes Put output. 227 */ 228 static class RandomPutGeneratingMapper 229 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 230 231 private int keyLength; 232 private static final int KEYLEN_DEFAULT = 10; 233 private static final String KEYLEN_CONF = "randomkv.key.length"; 234 235 private int valLength; 236 private static final int VALLEN_DEFAULT = 10; 237 private static final String VALLEN_CONF = "randomkv.val.length"; 238 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 239 private boolean multiTableMapper = false; 240 private TableName[] tables = null; 241 242 @Override 243 protected void setup(Context context) throws IOException, InterruptedException { 244 super.setup(context); 245 246 Configuration conf = context.getConfiguration(); 247 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 248 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 249 multiTableMapper = 250 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 251 if (multiTableMapper) { 252 tables = TABLE_NAMES; 253 } else { 254 tables = new TableName[] { TABLE_NAMES[0] }; 255 } 256 } 257 258 @Override 259 protected void map(NullWritable n1, NullWritable n2, 260 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 261 throws java.io.IOException, InterruptedException { 262 263 byte keyBytes[] = new byte[keyLength]; 264 byte valBytes[] = new byte[valLength]; 265 266 int taskId = context.getTaskAttemptID().getTaskID().getId(); 267 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 268 269 byte[] key; 270 for (int j = 0; j < tables.length; ++j) { 271 for (int i = 0; i < ROWSPERSPLIT; i++) { 272 Bytes.random(keyBytes); 273 // Ensure that unique tasks generate unique keys 274 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 275 Bytes.random(valBytes); 276 key = keyBytes; 277 if (multiTableMapper) { 278 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 279 } 280 281 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 282 Put p = new Put(keyBytes); 283 p.addColumn(family, QUALIFIER, valBytes); 284 // set TTL to very low so that the scan does not return any value 285 p.setTTL(1l); 286 context.write(new ImmutableBytesWritable(key), p); 287 } 288 } 289 } 290 } 291 } 292 293 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 294 if (putSortReducer) { 295 job.setInputFormatClass(NMapInputFormat.class); 296 job.setMapperClass(RandomPutGeneratingMapper.class); 297 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 298 job.setMapOutputValueClass(Put.class); 299 } else { 300 job.setInputFormatClass(NMapInputFormat.class); 301 job.setMapperClass(RandomKVGeneratingMapper.class); 302 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 303 job.setMapOutputValueClass(KeyValue.class); 304 } 305 } 306 307 /** 308 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 309 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 310 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 311 */ 312 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 313 @Test 314 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 315 Configuration conf = new Configuration(this.util.getConfiguration()); 316 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 317 TaskAttemptContext context = null; 318 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 319 try { 320 Job job = new Job(conf); 321 FileOutputFormat.setOutputPath(job, dir); 322 context = createTestTaskAttemptContext(job); 323 HFileOutputFormat2 hof = new HFileOutputFormat2(); 324 writer = hof.getRecordWriter(context); 325 final byte[] b = Bytes.toBytes("b"); 326 327 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 328 // changed by call to write. Check all in kv is same but ts. 329 KeyValue kv = new KeyValue(b, b, b); 330 KeyValue original = kv.clone(); 331 writer.write(new ImmutableBytesWritable(), kv); 332 assertFalse(original.equals(kv)); 333 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 334 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 335 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 336 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 337 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 338 339 // Test 2. Now test passing a kv that has explicit ts. It should not be 340 // changed by call to record write. 341 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 342 original = kv.clone(); 343 writer.write(new ImmutableBytesWritable(), kv); 344 assertTrue(original.equals(kv)); 345 } finally { 346 if (writer != null && context != null) writer.close(context); 347 dir.getFileSystem(conf).delete(dir, true); 348 } 349 } 350 351 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 352 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 353 TaskAttemptContext context = 354 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 355 return context; 356 } 357 358 /* 359 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 360 * time-restricted scans. 361 */ 362 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 363 @Test 364 public void test_TIMERANGE() throws Exception { 365 Configuration conf = new Configuration(this.util.getConfiguration()); 366 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 367 TaskAttemptContext context = null; 368 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 369 LOG.info("Timerange dir writing to dir: " + dir); 370 try { 371 // build a record writer using HFileOutputFormat2 372 Job job = new Job(conf); 373 FileOutputFormat.setOutputPath(job, dir); 374 context = createTestTaskAttemptContext(job); 375 HFileOutputFormat2 hof = new HFileOutputFormat2(); 376 writer = hof.getRecordWriter(context); 377 378 // Pass two key values with explicit times stamps 379 final byte[] b = Bytes.toBytes("b"); 380 381 // value 1 with timestamp 2000 382 KeyValue kv = new KeyValue(b, b, b, 2000, b); 383 KeyValue original = kv.clone(); 384 writer.write(new ImmutableBytesWritable(), kv); 385 assertEquals(original, kv); 386 387 // value 2 with timestamp 1000 388 kv = new KeyValue(b, b, b, 1000, b); 389 original = kv.clone(); 390 writer.write(new ImmutableBytesWritable(), kv); 391 assertEquals(original, kv); 392 393 // verify that the file has the proper FileInfo. 394 writer.close(context); 395 396 // the generated file lives 1 directory down from the attempt directory 397 // and is the only file, e.g. 398 // _attempt__0000_r_000000_0/b/1979617994050536795 399 FileSystem fs = FileSystem.get(conf); 400 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 401 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 402 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 403 404 // open as HFile Reader and pull out TIMERANGE FileInfo. 405 HFile.Reader rd = 406 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 407 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 408 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 409 assertNotNull(range); 410 411 // unmarshall and check values. 412 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 413 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 414 assertEquals(1000, timeRangeTracker.getMin()); 415 assertEquals(2000, timeRangeTracker.getMax()); 416 rd.close(); 417 } finally { 418 if (writer != null && context != null) writer.close(context); 419 dir.getFileSystem(conf).delete(dir, true); 420 } 421 } 422 423 /** 424 * Run small MR job. 425 */ 426 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 427 @Test 428 public void testWritingPEData() throws Exception { 429 Configuration conf = util.getConfiguration(); 430 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 431 FileSystem fs = testDir.getFileSystem(conf); 432 433 // Set down this value or we OOME in eclipse. 434 conf.setInt("mapreduce.task.io.sort.mb", 20); 435 // Write a few files. 436 long hregionMaxFilesize = 10 * 1024; 437 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 438 439 Job job = new Job(conf, "testWritingPEData"); 440 setupRandomGeneratorMapper(job, false); 441 // This partitioner doesn't work well for number keys but using it anyways 442 // just to demonstrate how to configure it. 443 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 444 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 445 446 Arrays.fill(startKey, (byte) 0); 447 Arrays.fill(endKey, (byte) 0xff); 448 449 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 450 // Set start and end rows for partitioner. 451 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 452 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 453 job.setReducerClass(CellSortReducer.class); 454 job.setOutputFormatClass(HFileOutputFormat2.class); 455 job.setNumReduceTasks(4); 456 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 457 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 458 CellSerialization.class.getName()); 459 460 FileOutputFormat.setOutputPath(job, testDir); 461 assertTrue(job.waitForCompletion(false)); 462 FileStatus[] files = fs.listStatus(testDir); 463 assertTrue(files.length > 0); 464 465 // check output file num and size. 466 for (byte[] family : FAMILIES) { 467 long kvCount = 0; 468 RemoteIterator<LocatedFileStatus> iterator = 469 fs.listFiles(testDir.suffix("/" + new String(family)), true); 470 while (iterator.hasNext()) { 471 LocatedFileStatus keyFileStatus = iterator.next(); 472 HFile.Reader reader = 473 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 474 HFileScanner scanner = reader.getScanner(conf, false, false, false); 475 476 kvCount += reader.getEntries(); 477 scanner.seekTo(); 478 long perKVSize = scanner.getCell().getSerializedSize(); 479 assertTrue("Data size of each file should not be too large.", 480 perKVSize * reader.getEntries() <= hregionMaxFilesize); 481 } 482 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 483 } 484 } 485 486 /** 487 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 488 */ 489 @Test 490 public void test_WritingTagData() throws Exception { 491 Configuration conf = new Configuration(this.util.getConfiguration()); 492 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 493 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 494 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 495 TaskAttemptContext context = null; 496 Path dir = util.getDataTestDir("WritingTagData"); 497 try { 498 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 499 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 500 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 501 Job job = new Job(conf); 502 FileOutputFormat.setOutputPath(job, dir); 503 context = createTestTaskAttemptContext(job); 504 HFileOutputFormat2 hof = new HFileOutputFormat2(); 505 writer = hof.getRecordWriter(context); 506 final byte[] b = Bytes.toBytes("b"); 507 508 List<Tag> tags = new ArrayList<>(); 509 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 510 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 511 writer.write(new ImmutableBytesWritable(), kv); 512 writer.close(context); 513 writer = null; 514 FileSystem fs = dir.getFileSystem(conf); 515 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 516 while (iterator.hasNext()) { 517 LocatedFileStatus keyFileStatus = iterator.next(); 518 HFile.Reader reader = 519 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 520 HFileScanner scanner = reader.getScanner(conf, false, false, false); 521 scanner.seekTo(); 522 ExtendedCell cell = scanner.getCell(); 523 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 524 assertTrue(tagsFromCell.size() > 0); 525 for (Tag tag : tagsFromCell) { 526 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 527 } 528 } 529 } finally { 530 if (writer != null && context != null) writer.close(context); 531 dir.getFileSystem(conf).delete(dir, true); 532 } 533 } 534 535 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 536 @Test 537 public void testJobConfiguration() throws Exception { 538 Configuration conf = new Configuration(this.util.getConfiguration()); 539 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 540 util.getDataTestDir("testJobConfiguration").toString()); 541 Job job = new Job(conf); 542 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 543 Table table = Mockito.mock(Table.class); 544 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 545 setupMockStartKeys(regionLocator); 546 setupMockTableName(regionLocator); 547 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 548 assertEquals(job.getNumReduceTasks(), 4); 549 } 550 551 private byte[][] generateRandomStartKeys(int numKeys) { 552 Random random = ThreadLocalRandom.current(); 553 byte[][] ret = new byte[numKeys][]; 554 // first region start key is always empty 555 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 556 for (int i = 1; i < numKeys; i++) { 557 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 558 } 559 return ret; 560 } 561 562 /* 563 * This method takes some time and is done inline uploading data. For example, doing the mapfile 564 * test, generation of the key and value consumes about 30% of CPU time. 565 * @return Generated random value to insert into a table cell. 566 */ 567 public static byte[] generateData(final Random r, int length) { 568 byte[] b = new byte[length]; 569 int i; 570 571 for (i = 0; i < (length - 8); i += 8) { 572 b[i] = (byte) (65 + r.nextInt(26)); 573 b[i + 1] = b[i]; 574 b[i + 2] = b[i]; 575 b[i + 3] = b[i]; 576 b[i + 4] = b[i]; 577 b[i + 5] = b[i]; 578 b[i + 6] = b[i]; 579 b[i + 7] = b[i]; 580 } 581 582 byte a = (byte) (65 + r.nextInt(26)); 583 for (; i < length; i++) { 584 b[i] = a; 585 } 586 return b; 587 } 588 589 private byte[][] generateRandomSplitKeys(int numKeys) { 590 Random random = ThreadLocalRandom.current(); 591 byte[][] ret = new byte[numKeys][]; 592 for (int i = 0; i < numKeys; i++) { 593 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 594 } 595 return ret; 596 } 597 598 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 599 @Test 600 public void testMRIncrementalLoad() throws Exception { 601 LOG.info("\nStarting test testMRIncrementalLoad\n"); 602 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 603 } 604 605 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 606 @Test 607 public void testMRIncrementalLoadWithSplit() throws Exception { 608 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 609 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 610 } 611 612 /** 613 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 614 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 615 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 616 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 617 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 618 * locality features more easily. 619 */ 620 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 621 @Test 622 public void testMRIncrementalLoadWithLocality() throws Exception { 623 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 624 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 625 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 626 } 627 628 // @Ignore("Wahtevs") 629 @Test 630 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 631 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 632 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 633 } 634 635 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 636 boolean putSortReducer, String tableStr) throws Exception { 637 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 638 Arrays.asList(tableStr)); 639 } 640 641 @Test 642 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 643 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 644 doIncrementalLoadTest(false, false, true, 645 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 646 } 647 648 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 649 boolean putSortReducer, List<String> tableStr) throws Exception { 650 util = new HBaseTestingUtil(); 651 Configuration conf = util.getConfiguration(); 652 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 653 int hostCount = 1; 654 int regionNum = 5; 655 if (shouldKeepLocality) { 656 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 657 // explicit hostnames parameter just like MiniDFSCluster does. 658 hostCount = 3; 659 regionNum = 20; 660 } 661 662 String[] hostnames = new String[hostCount]; 663 for (int i = 0; i < hostCount; ++i) { 664 hostnames[i] = "datanode_" + i; 665 } 666 StartTestingClusterOption option = StartTestingClusterOption.builder() 667 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 668 util.startMiniCluster(option); 669 670 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 671 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 672 boolean writeMultipleTables = tableStr.size() > 1; 673 for (String tableStrSingle : tableStr) { 674 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 675 TableName tableName = TableName.valueOf(tableStrSingle); 676 Table table = util.createTable(tableName, FAMILIES, splitKeys); 677 678 RegionLocator r = util.getConnection().getRegionLocator(tableName); 679 assertEquals("Should start with empty table", 0, util.countRows(table)); 680 int numRegions = r.getStartKeys().length; 681 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 682 683 allTables.put(tableStrSingle, table); 684 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 685 } 686 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 687 // Generate the bulk load files 688 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 689 if (writeMultipleTables) { 690 testDir = new Path(testDir, "default"); 691 } 692 693 for (Table tableSingle : allTables.values()) { 694 // This doesn't write into the table, just makes files 695 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 696 } 697 int numTableDirs = 0; 698 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 699 for (FileStatus tf : fss) { 700 Path tablePath = testDir; 701 if (writeMultipleTables) { 702 if (allTables.containsKey(tf.getPath().getName())) { 703 ++numTableDirs; 704 tablePath = tf.getPath(); 705 } else { 706 continue; 707 } 708 } 709 710 // Make sure that a directory was created for every CF 711 int dir = 0; 712 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 713 for (FileStatus f : fss) { 714 for (byte[] family : FAMILIES) { 715 if (Bytes.toString(family).equals(f.getPath().getName())) { 716 ++dir; 717 } 718 } 719 } 720 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 721 } 722 if (writeMultipleTables) { 723 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 724 } 725 726 Admin admin = util.getConnection().getAdmin(); 727 try { 728 // handle the split case 729 if (shouldChangeRegions) { 730 Table chosenTable = allTables.values().iterator().next(); 731 // Choose a semi-random table if multiple tables are available 732 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 733 admin.disableTable(chosenTable.getName()); 734 util.waitUntilNoRegionsInTransition(); 735 736 util.deleteTable(chosenTable.getName()); 737 byte[][] newSplitKeys = generateRandomSplitKeys(14); 738 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 739 740 while ( 741 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 742 .size() != 15 || !admin.isTableAvailable(table.getName()) 743 ) { 744 Thread.sleep(200); 745 LOG.info("Waiting for new region assignment to happen"); 746 } 747 } 748 749 // Perform the actual load 750 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 751 Path tableDir = testDir; 752 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 753 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 754 if (writeMultipleTables) { 755 tableDir = new Path(testDir, tableNameStr); 756 } 757 Table currentTable = allTables.get(tableNameStr); 758 TableName currentTableName = currentTable.getName(); 759 BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir); 760 761 // Ensure data shows up 762 int expectedRows = 0; 763 if (putSortReducer) { 764 // no rows should be extracted 765 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 766 util.countRows(currentTable)); 767 } else { 768 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 769 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 770 util.countRows(currentTable)); 771 Scan scan = new Scan(); 772 ResultScanner results = currentTable.getScanner(scan); 773 for (Result res : results) { 774 assertEquals(FAMILIES.length, res.rawCells().length); 775 Cell first = res.rawCells()[0]; 776 for (Cell kv : res.rawCells()) { 777 assertTrue(CellUtil.matchingRows(first, kv)); 778 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 779 } 780 } 781 results.close(); 782 } 783 String tableDigestBefore = util.checksumRows(currentTable); 784 // Check region locality 785 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 786 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 787 hbd.add(region.getHDFSBlocksDistribution()); 788 } 789 for (String hostname : hostnames) { 790 float locality = hbd.getBlockLocalityIndex(hostname); 791 LOG.info("locality of [" + hostname + "]: " + locality); 792 assertEquals(100, (int) (locality * 100)); 793 } 794 795 // Cause regions to reopen 796 admin.disableTable(currentTableName); 797 while (!admin.isTableDisabled(currentTableName)) { 798 Thread.sleep(200); 799 LOG.info("Waiting for table to disable"); 800 } 801 admin.enableTable(currentTableName); 802 util.waitTableAvailable(currentTableName); 803 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 804 util.checksumRows(currentTable)); 805 } 806 } finally { 807 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 808 tableInfoSingle.getRegionLocator().close(); 809 } 810 for (Entry<String, Table> singleTable : allTables.entrySet()) { 811 singleTable.getValue().close(); 812 util.deleteTable(singleTable.getValue().getName()); 813 } 814 testDir.getFileSystem(conf).delete(testDir, true); 815 util.shutdownMiniCluster(); 816 } 817 } 818 819 private void runIncrementalPELoad(Configuration conf, 820 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 821 throws IOException, InterruptedException, ClassNotFoundException { 822 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 823 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 824 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 825 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 826 CellSerialization.class.getName()); 827 setupRandomGeneratorMapper(job, putSortReducer); 828 if (tableInfo.size() > 1) { 829 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 830 int sum = 0; 831 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 832 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 833 } 834 assertEquals(sum, job.getNumReduceTasks()); 835 } else { 836 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 837 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(), 838 regionLocator); 839 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 840 } 841 842 FileOutputFormat.setOutputPath(job, outDir); 843 844 assertFalse(util.getTestFileSystem().exists(outDir)); 845 846 assertTrue(job.waitForCompletion(true)); 847 } 848 849 /** 850 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 851 * family compression map is correctly serialized into and deserialized from configuration 852 */ 853 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 854 @Test 855 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 856 for (int numCfs = 0; numCfs <= 3; numCfs++) { 857 Configuration conf = new Configuration(this.util.getConfiguration()); 858 Map<String, Compression.Algorithm> familyToCompression = 859 getMockColumnFamiliesForCompression(numCfs); 860 Table table = Mockito.mock(Table.class); 861 setupMockColumnFamiliesForCompression(table, familyToCompression); 862 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 863 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 864 Arrays.asList(table.getDescriptor()))); 865 866 // read back family specific compression setting from the configuration 867 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 868 HFileOutputFormat2.createFamilyCompressionMap(conf); 869 870 // test that we have a value for all column families that matches with the 871 // used mock values 872 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 873 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 874 entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey()))); 875 } 876 } 877 } 878 879 private void setupMockColumnFamiliesForCompression(Table table, 880 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 881 882 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 883 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 884 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 885 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 886 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 887 888 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 889 } 890 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 891 } 892 893 /** 894 * @return a map from column family names to compression algorithms for testing column family 895 * compression. Column family names have special characters 896 */ 897 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 898 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 899 // use column family names having special characters 900 if (numCfs-- > 0) { 901 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 902 } 903 if (numCfs-- > 0) { 904 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 905 } 906 if (numCfs-- > 0) { 907 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 908 } 909 if (numCfs-- > 0) { 910 familyToCompression.put("Family3", Compression.Algorithm.NONE); 911 } 912 return familyToCompression; 913 } 914 915 /** 916 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 917 * family bloom type map is correctly serialized into and deserialized from configuration 918 */ 919 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 920 @Test 921 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 922 for (int numCfs = 0; numCfs <= 2; numCfs++) { 923 Configuration conf = new Configuration(this.util.getConfiguration()); 924 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 925 Table table = Mockito.mock(Table.class); 926 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 927 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 928 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 929 Arrays.asList(table.getDescriptor()))); 930 931 // read back family specific data block encoding settings from the 932 // configuration 933 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 934 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 935 936 // test that we have a value for all column families that matches with the 937 // used mock values 938 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 939 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 940 entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey()))); 941 } 942 } 943 } 944 945 private void setupMockColumnFamiliesForBloomType(Table table, 946 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 947 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 948 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 949 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 950 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 951 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 952 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 953 } 954 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 955 } 956 957 /** 958 * @return a map from column family names to compression algorithms for testing column family 959 * compression. Column family names have special characters 960 */ 961 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 962 Map<String, BloomType> familyToBloomType = new HashMap<>(); 963 // use column family names having special characters 964 if (numCfs-- > 0) { 965 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 966 } 967 if (numCfs-- > 0) { 968 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 969 } 970 if (numCfs-- > 0) { 971 familyToBloomType.put("Family3", BloomType.NONE); 972 } 973 return familyToBloomType; 974 } 975 976 /** 977 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 978 * family block size map is correctly serialized into and deserialized from configuration 979 */ 980 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 981 @Test 982 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 983 for (int numCfs = 0; numCfs <= 3; numCfs++) { 984 Configuration conf = new Configuration(this.util.getConfiguration()); 985 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 986 Table table = Mockito.mock(Table.class); 987 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 988 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 989 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 990 Arrays.asList(table.getDescriptor()))); 991 992 // read back family specific data block encoding settings from the 993 // configuration 994 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 995 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 996 997 // test that we have a value for all column families that matches with the 998 // used mock values 999 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 1000 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 1001 entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey()))); 1002 } 1003 } 1004 } 1005 1006 private void setupMockColumnFamiliesForBlockSize(Table table, 1007 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 1008 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1009 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 1010 ColumnFamilyDescriptor columnFamilyDescriptor = 1011 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1012 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 1013 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1014 } 1015 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1016 } 1017 1018 /** 1019 * @return a map from column family names to compression algorithms for testing column family 1020 * compression. Column family names have special characters 1021 */ 1022 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 1023 Map<String, Integer> familyToBlockSize = new HashMap<>(); 1024 // use column family names having special characters 1025 if (numCfs-- > 0) { 1026 familyToBlockSize.put("Family1!@#!@#&", 1234); 1027 } 1028 if (numCfs-- > 0) { 1029 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1030 } 1031 if (numCfs-- > 0) { 1032 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1033 } 1034 if (numCfs-- > 0) { 1035 familyToBlockSize.put("Family3", 0); 1036 } 1037 return familyToBlockSize; 1038 } 1039 1040 /** 1041 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 1042 * the family data block encoding map is correctly serialized into and deserialized from 1043 * configuration 1044 */ 1045 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1046 @Test 1047 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1048 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1049 Configuration conf = new Configuration(this.util.getConfiguration()); 1050 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1051 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1052 Table table = Mockito.mock(Table.class); 1053 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1054 TableDescriptor tableDescriptor = table.getDescriptor(); 1055 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1056 HFileOutputFormat2.serializeColumnFamilyAttribute( 1057 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1058 1059 // read back family specific data block encoding settings from the 1060 // configuration 1061 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1062 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1063 1064 // test that we have a value for all column families that matches with the 1065 // used mock values 1066 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1067 assertEquals( 1068 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1069 entry.getValue(), 1070 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey()))); 1071 } 1072 } 1073 } 1074 1075 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1076 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1077 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1078 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1079 ColumnFamilyDescriptor columnFamilyDescriptor = 1080 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1081 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 1082 .build(); 1083 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1084 } 1085 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1086 } 1087 1088 /** 1089 * @return a map from column family names to compression algorithms for testing column family 1090 * compression. Column family names have special characters 1091 */ 1092 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1093 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1094 // use column family names having special characters 1095 if (numCfs-- > 0) { 1096 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1097 } 1098 if (numCfs-- > 0) { 1099 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1100 } 1101 if (numCfs-- > 0) { 1102 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1103 } 1104 if (numCfs-- > 0) { 1105 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1106 } 1107 return familyToDataBlockEncoding; 1108 } 1109 1110 private void setupMockStartKeys(RegionLocator table) throws IOException { 1111 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1112 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1113 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1114 } 1115 1116 private void setupMockTableName(RegionLocator table) throws IOException { 1117 TableName mockTableName = TableName.valueOf("mock_table"); 1118 Mockito.doReturn(mockTableName).when(table).getName(); 1119 } 1120 1121 /** 1122 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1123 * from the column family descriptor 1124 */ 1125 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1126 @Test 1127 public void testColumnFamilySettings() throws Exception { 1128 Configuration conf = new Configuration(this.util.getConfiguration()); 1129 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1130 TaskAttemptContext context = null; 1131 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1132 1133 // Setup table descriptor 1134 Table table = Mockito.mock(Table.class); 1135 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1136 TableDescriptorBuilder tableDescriptorBuilder = 1137 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1138 1139 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 1140 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 1141 tableDescriptorBuilder.setColumnFamily(hcd); 1142 } 1143 1144 // set up the table to return some mock keys 1145 setupMockStartKeys(regionLocator); 1146 1147 try { 1148 // partial map red setup to get an operational writer for testing 1149 // We turn off the sequence file compression, because DefaultCodec 1150 // pollutes the GZip codec pool with an incompatible compressor. 1151 conf.set("io.seqfile.compression.type", "NONE"); 1152 conf.set("hbase.fs.tmp.dir", dir.toString()); 1153 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1154 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1155 1156 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1157 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1158 setupRandomGeneratorMapper(job, false); 1159 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 1160 FileOutputFormat.setOutputPath(job, dir); 1161 context = createTestTaskAttemptContext(job); 1162 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1163 writer = hof.getRecordWriter(context); 1164 1165 // write out random rows 1166 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 1167 ROWSPERSPLIT); 1168 writer.close(context); 1169 1170 // Make sure that a directory was created for every CF 1171 FileSystem fs = dir.getFileSystem(conf); 1172 1173 // commit so that the filesystem has one directory per column family 1174 hof.getOutputCommitter(context).commitTask(context); 1175 hof.getOutputCommitter(context).commitJob(context); 1176 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1177 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 1178 for (FileStatus f : families) { 1179 String familyStr = f.getPath().getName(); 1180 ColumnFamilyDescriptor hcd = 1181 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 1182 // verify that the compression on this file matches the configured 1183 // compression 1184 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1185 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1186 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1187 1188 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1189 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1190 assertEquals( 1191 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1192 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1193 assertEquals( 1194 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1195 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1196 } 1197 } finally { 1198 dir.getFileSystem(conf).delete(dir, true); 1199 } 1200 } 1201 1202 /** 1203 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1204 * family descriptors 1205 */ 1206 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1207 TaskAttemptContext context, Set<byte[]> families, int numRows) 1208 throws IOException, InterruptedException { 1209 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1210 int valLength = 10; 1211 byte valBytes[] = new byte[valLength]; 1212 1213 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1214 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1215 final byte[] qualifier = Bytes.toBytes("data"); 1216 for (int i = 0; i < numRows; i++) { 1217 Bytes.putInt(keyBytes, 0, i); 1218 Bytes.random(valBytes); 1219 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1220 for (byte[] family : families) { 1221 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1222 writer.write(key, kv); 1223 } 1224 } 1225 } 1226 1227 /** 1228 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1229 * excluded from minor compaction. Without the fix of HBASE-6901, an 1230 * ArrayIndexOutOfBoundsException will be thrown. 1231 */ 1232 @Ignore("Flakey: See HBASE-9051") 1233 @Test 1234 public void testExcludeAllFromMinorCompaction() throws Exception { 1235 Configuration conf = util.getConfiguration(); 1236 conf.setInt("hbase.hstore.compaction.min", 2); 1237 generateRandomStartKeys(5); 1238 1239 util.startMiniCluster(); 1240 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1241 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1242 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1243 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1244 assertEquals("Should start with empty table", 0, util.countRows(table)); 1245 1246 // deep inspection: get the StoreFile dir 1247 final Path storePath = 1248 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1249 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1250 Bytes.toString(FAMILIES[0]))); 1251 assertEquals(0, fs.listStatus(storePath).length); 1252 1253 // Generate two bulk load files 1254 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1255 1256 for (int i = 0; i < 2; i++) { 1257 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1258 runIncrementalPELoad(conf, 1259 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 1260 conn.getRegionLocator(TABLE_NAMES[0]))), 1261 testDir, false); 1262 // Perform the actual load 1263 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1264 } 1265 1266 // Ensure data shows up 1267 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1268 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 1269 util.countRows(table)); 1270 1271 // should have a second StoreFile now 1272 assertEquals(2, fs.listStatus(storePath).length); 1273 1274 // minor compactions shouldn't get rid of the file 1275 admin.compact(TABLE_NAMES[0]); 1276 try { 1277 quickPoll(new Callable<Boolean>() { 1278 @Override 1279 public Boolean call() throws Exception { 1280 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1281 for (HRegion region : regions) { 1282 for (HStore store : region.getStores()) { 1283 store.closeAndArchiveCompactedFiles(); 1284 } 1285 } 1286 return fs.listStatus(storePath).length == 1; 1287 } 1288 }, 5000); 1289 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1290 } catch (AssertionError ae) { 1291 // this is expected behavior 1292 } 1293 1294 // a major compaction should work though 1295 admin.majorCompact(TABLE_NAMES[0]); 1296 quickPoll(new Callable<Boolean>() { 1297 @Override 1298 public Boolean call() throws Exception { 1299 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1300 for (HRegion region : regions) { 1301 for (HStore store : region.getStores()) { 1302 store.closeAndArchiveCompactedFiles(); 1303 } 1304 } 1305 return fs.listStatus(storePath).length == 1; 1306 } 1307 }, 5000); 1308 1309 } finally { 1310 util.shutdownMiniCluster(); 1311 } 1312 } 1313 1314 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1315 @Test 1316 public void testExcludeMinorCompaction() throws Exception { 1317 Configuration conf = util.getConfiguration(); 1318 conf.setInt("hbase.hstore.compaction.min", 2); 1319 generateRandomStartKeys(5); 1320 1321 util.startMiniCluster(); 1322 try (Connection conn = ConnectionFactory.createConnection(conf); 1323 Admin admin = conn.getAdmin()) { 1324 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1325 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1326 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1327 assertEquals("Should start with empty table", 0, util.countRows(table)); 1328 1329 // deep inspection: get the StoreFile dir 1330 final Path storePath = 1331 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1332 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1333 Bytes.toString(FAMILIES[0]))); 1334 assertEquals(0, fs.listStatus(storePath).length); 1335 1336 // put some data in it and flush to create a storefile 1337 Put p = new Put(Bytes.toBytes("test")); 1338 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1339 table.put(p); 1340 admin.flush(TABLE_NAMES[0]); 1341 assertEquals(1, util.countRows(table)); 1342 quickPoll(new Callable<Boolean>() { 1343 @Override 1344 public Boolean call() throws Exception { 1345 return fs.listStatus(storePath).length == 1; 1346 } 1347 }, 5000); 1348 1349 // Generate a bulk load file with more rows 1350 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1351 1352 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1353 runIncrementalPELoad(conf, 1354 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 1355 testDir, false); 1356 1357 // Perform the actual load 1358 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1359 1360 // Ensure data shows up 1361 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1362 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1, 1363 util.countRows(table)); 1364 1365 // should have a second StoreFile now 1366 assertEquals(2, fs.listStatus(storePath).length); 1367 1368 // minor compactions shouldn't get rid of the file 1369 admin.compact(TABLE_NAMES[0]); 1370 try { 1371 quickPoll(new Callable<Boolean>() { 1372 @Override 1373 public Boolean call() throws Exception { 1374 return fs.listStatus(storePath).length == 1; 1375 } 1376 }, 5000); 1377 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1378 } catch (AssertionError ae) { 1379 // this is expected behavior 1380 } 1381 1382 // a major compaction should work though 1383 admin.majorCompact(TABLE_NAMES[0]); 1384 quickPoll(new Callable<Boolean>() { 1385 @Override 1386 public Boolean call() throws Exception { 1387 return fs.listStatus(storePath).length == 1; 1388 } 1389 }, 5000); 1390 1391 } finally { 1392 util.shutdownMiniCluster(); 1393 } 1394 } 1395 1396 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1397 int sleepMs = 10; 1398 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1399 while (retries-- > 0) { 1400 if (c.call().booleanValue()) { 1401 return; 1402 } 1403 Thread.sleep(sleepMs); 1404 } 1405 fail(); 1406 } 1407 1408 public static void main(String args[]) throws Exception { 1409 new TestHFileOutputFormat2().manualTest(args); 1410 } 1411 1412 public void manualTest(String args[]) throws Exception { 1413 Configuration conf = HBaseConfiguration.create(); 1414 util = new HBaseTestingUtil(conf); 1415 if ("newtable".equals(args[0])) { 1416 TableName tname = TableName.valueOf(args[1]); 1417 byte[][] splitKeys = generateRandomSplitKeys(4); 1418 Table table = util.createTable(tname, FAMILIES, splitKeys); 1419 } else if ("incremental".equals(args[0])) { 1420 TableName tname = TableName.valueOf(args[1]); 1421 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1422 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1423 Path outDir = new Path("incremental-out"); 1424 runIncrementalPELoad(conf, 1425 Arrays 1426 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 1427 outDir, false); 1428 } 1429 } else { 1430 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1431 } 1432 } 1433 1434 @Test 1435 public void testBlockStoragePolicy() throws Exception { 1436 util = new HBaseTestingUtil(); 1437 Configuration conf = util.getConfiguration(); 1438 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1439 1440 conf.set( 1441 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1442 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1443 "ONE_SSD"); 1444 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1445 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1446 util.startMiniDFSCluster(3); 1447 FileSystem fs = util.getDFSCluster().getFileSystem(); 1448 try { 1449 fs.mkdirs(cf1Dir); 1450 fs.mkdirs(cf2Dir); 1451 1452 // the original block storage policy would be HOT 1453 String spA = getStoragePolicyName(fs, cf1Dir); 1454 String spB = getStoragePolicyName(fs, cf2Dir); 1455 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1456 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1457 assertEquals("HOT", spA); 1458 assertEquals("HOT", spB); 1459 1460 // alter table cf schema to change storage policies 1461 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1462 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1463 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1464 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1465 spA = getStoragePolicyName(fs, cf1Dir); 1466 spB = getStoragePolicyName(fs, cf2Dir); 1467 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1468 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1469 assertNotNull(spA); 1470 assertEquals("ONE_SSD", spA); 1471 assertNotNull(spB); 1472 assertEquals("ALL_SSD", spB); 1473 } finally { 1474 fs.delete(cf1Dir, true); 1475 fs.delete(cf2Dir, true); 1476 util.shutdownMiniDFSCluster(); 1477 } 1478 } 1479 1480 private String getStoragePolicyName(FileSystem fs, Path path) { 1481 try { 1482 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1483 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1484 } catch (Exception e) { 1485 // Maybe fail because of using old HDFS version, try the old way 1486 if (LOG.isTraceEnabled()) { 1487 LOG.trace("Failed to get policy directly", e); 1488 } 1489 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1490 return policy == null ? "HOT" : policy;// HOT by default 1491 } 1492 } 1493 1494 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1495 try { 1496 if (fs instanceof DistributedFileSystem) { 1497 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1498 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1499 if (null != status) { 1500 byte storagePolicyId = status.getStoragePolicy(); 1501 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1502 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1503 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1504 for (BlockStoragePolicy policy : policies) { 1505 if (policy.getId() == storagePolicyId) { 1506 return policy.getName(); 1507 } 1508 } 1509 } 1510 } 1511 } 1512 } catch (Throwable e) { 1513 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1514 } 1515 1516 return null; 1517 } 1518 1519 @Test 1520 public void TestConfigurePartitioner() throws IOException { 1521 Configuration conf = util.getConfiguration(); 1522 // Create a user who is not the current user 1523 String fooUserName = "foo1234"; 1524 String fooGroupName = "group1"; 1525 UserGroupInformation ugi = 1526 UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName }); 1527 // Get user's home directory 1528 Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() { 1529 @Override 1530 public Path run() { 1531 try (FileSystem fs = FileSystem.get(conf)) { 1532 return fs.makeQualified(fs.getHomeDirectory()); 1533 } catch (IOException ioe) { 1534 LOG.error("Failed to get foo's home directory", ioe); 1535 } 1536 return null; 1537 } 1538 }); 1539 1540 Job job = Mockito.mock(Job.class); 1541 Mockito.doReturn(conf).when(job).getConfiguration(); 1542 ImmutableBytesWritable writable = new ImmutableBytesWritable(); 1543 List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>(); 1544 splitPoints.add(writable); 1545 1546 ugi.doAs(new PrivilegedAction<Void>() { 1547 @Override 1548 public Void run() { 1549 try { 1550 HFileOutputFormat2.configurePartitioner(job, splitPoints, false); 1551 } catch (IOException ioe) { 1552 LOG.error("Failed to configure partitioner", ioe); 1553 } 1554 return null; 1555 } 1556 }); 1557 FileSystem fs = FileSystem.get(conf); 1558 // verify that the job uses TotalOrderPartitioner 1559 verify(job).setPartitionerClass(TotalOrderPartitioner.class); 1560 // verify that TotalOrderPartitioner.setPartitionFile() is called. 1561 String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path"); 1562 Assert.assertNotNull(partitionPathString); 1563 // Make sure the partion file is in foo1234's home directory, and that 1564 // the file exists. 1565 Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString())); 1566 Assert.assertTrue(fs.exists(new Path(partitionPathString))); 1567 } 1568 1569 @Test 1570 public void TestConfigureCompression() throws Exception { 1571 Configuration conf = new Configuration(this.util.getConfiguration()); 1572 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1573 TaskAttemptContext context = null; 1574 Path dir = util.getDataTestDir("TestConfigureCompression"); 1575 String hfileoutputformatCompression = "gz"; 1576 1577 try { 1578 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1579 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1580 1581 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1582 1583 Job job = Job.getInstance(conf); 1584 FileOutputFormat.setOutputPath(job, dir); 1585 context = createTestTaskAttemptContext(job); 1586 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1587 writer = hof.getRecordWriter(context); 1588 final byte[] b = Bytes.toBytes("b"); 1589 1590 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1591 writer.write(new ImmutableBytesWritable(), kv); 1592 writer.close(context); 1593 writer = null; 1594 FileSystem fs = dir.getFileSystem(conf); 1595 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1596 while (iterator.hasNext()) { 1597 LocatedFileStatus keyFileStatus = iterator.next(); 1598 HFile.Reader reader = 1599 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1600 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1601 hfileoutputformatCompression); 1602 } 1603 } finally { 1604 if (writer != null && context != null) { 1605 writer.close(context); 1606 } 1607 dir.getFileSystem(conf).delete(dir, true); 1608 } 1609 1610 } 1611 1612 @Test 1613 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1614 // Start cluster A 1615 util = new HBaseTestingUtil(); 1616 Configuration confA = util.getConfiguration(); 1617 int hostCount = 3; 1618 int regionNum = 20; 1619 String[] hostnames = new String[hostCount]; 1620 for (int i = 0; i < hostCount; ++i) { 1621 hostnames[i] = "datanode_" + i; 1622 } 1623 StartTestingClusterOption option = StartTestingClusterOption.builder() 1624 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1625 util.startMiniCluster(option); 1626 1627 // Start cluster B 1628 HBaseTestingUtil utilB = new HBaseTestingUtil(); 1629 Configuration confB = utilB.getConfiguration(); 1630 utilB.startMiniCluster(option); 1631 1632 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1633 1634 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1635 TableName tableName = TableName.valueOf("table"); 1636 // Create table in cluster B 1637 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1638 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1639 // Generate the bulk load files 1640 // Job has zookeeper configuration for cluster A 1641 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1642 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1643 Configuration jobConf = job.getConfiguration(); 1644 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1645 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1646 setupRandomGeneratorMapper(job, false); 1647 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1648 1649 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1650 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1651 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1652 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1653 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1654 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1655 1656 String bSpecificConfigKey = "my.override.config.for.b"; 1657 String bSpecificConfigValue = "b-specific-value"; 1658 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1659 bSpecificConfigValue); 1660 1661 FileOutputFormat.setOutputPath(job, testDir); 1662 1663 assertFalse(util.getTestFileSystem().exists(testDir)); 1664 1665 assertTrue(job.waitForCompletion(true)); 1666 1667 final List<Configuration> configs = 1668 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1669 1670 assertFalse(configs.isEmpty()); 1671 for (Configuration config : configs) { 1672 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1673 config.get(HConstants.ZOOKEEPER_QUORUM)); 1674 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1675 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1676 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1677 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1678 1679 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1680 } 1681 } finally { 1682 utilB.deleteTable(tableName); 1683 testDir.getFileSystem(confA).delete(testDir, true); 1684 util.shutdownMiniCluster(); 1685 utilB.shutdownMiniCluster(); 1686 } 1687 } 1688 1689 private static class ConfigurationCaptorConnection implements Connection { 1690 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1691 1692 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1693 1694 private final Connection delegate; 1695 1696 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1697 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 1698 // here we do not use this registry, so close it... 1699 registry.close(); 1700 // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection 1701 // implementation in below method 1702 delegate = 1703 FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) 1704 .toConnection(); 1705 1706 final String uuid = conf.get(UUID_KEY); 1707 if (uuid != null) { 1708 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1709 } 1710 } 1711 1712 static UUID configureConnectionImpl(Configuration conf) { 1713 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1714 ConfigurationCaptorConnection.class, Connection.class); 1715 1716 final UUID uuid = UUID.randomUUID(); 1717 conf.set(UUID_KEY, uuid.toString()); 1718 return uuid; 1719 } 1720 1721 static List<Configuration> getCapturedConfigarutions(UUID key) { 1722 return confs.get(key); 1723 } 1724 1725 @Override 1726 public Configuration getConfiguration() { 1727 return delegate.getConfiguration(); 1728 } 1729 1730 @Override 1731 public Table getTable(TableName tableName) throws IOException { 1732 return delegate.getTable(tableName); 1733 } 1734 1735 @Override 1736 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1737 return delegate.getTable(tableName, pool); 1738 } 1739 1740 @Override 1741 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1742 return delegate.getBufferedMutator(tableName); 1743 } 1744 1745 @Override 1746 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1747 return delegate.getBufferedMutator(params); 1748 } 1749 1750 @Override 1751 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1752 return delegate.getRegionLocator(tableName); 1753 } 1754 1755 @Override 1756 public void clearRegionLocationCache() { 1757 delegate.clearRegionLocationCache(); 1758 } 1759 1760 @Override 1761 public Admin getAdmin() throws IOException { 1762 return delegate.getAdmin(); 1763 } 1764 1765 @Override 1766 public void close() throws IOException { 1767 delegate.close(); 1768 } 1769 1770 @Override 1771 public boolean isClosed() { 1772 return delegate.isClosed(); 1773 } 1774 1775 @Override 1776 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1777 return delegate.getTableBuilder(tableName, pool); 1778 } 1779 1780 @Override 1781 public AsyncConnection toAsyncConnection() { 1782 return delegate.toAsyncConnection(); 1783 } 1784 1785 @Override 1786 public String getClusterId() { 1787 return delegate.getClusterId(); 1788 } 1789 1790 @Override 1791 public Hbck getHbck() throws IOException { 1792 return delegate.getHbck(); 1793 } 1794 1795 @Override 1796 public Hbck getHbck(ServerName masterServer) throws IOException { 1797 return delegate.getHbck(masterServer); 1798 } 1799 1800 @Override 1801 public void abort(String why, Throwable e) { 1802 delegate.abort(why, e); 1803 } 1804 1805 @Override 1806 public boolean isAborted() { 1807 return delegate.isAborted(); 1808 } 1809 } 1810 1811}