001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection; 021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNotSame; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Random; 038import java.util.Set; 039import java.util.UUID; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.CopyOnWriteArrayList; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.ThreadLocalRandom; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.LocatedFileStatus; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.RemoteIterator; 053import org.apache.hadoop.hbase.ArrayBackedTag; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 057import org.apache.hadoop.hbase.HBaseClassTestRule; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseTestingUtility; 060import org.apache.hadoop.hbase.HColumnDescriptor; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HDFSBlocksDistribution; 063import org.apache.hadoop.hbase.HTableDescriptor; 064import org.apache.hadoop.hbase.HadoopShims; 065import org.apache.hadoop.hbase.KeyValue; 066import org.apache.hadoop.hbase.PerformanceEvaluation; 067import org.apache.hadoop.hbase.PrivateCellUtil; 068import org.apache.hadoop.hbase.ServerName; 069import org.apache.hadoop.hbase.StartMiniClusterOption; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.Tag; 072import org.apache.hadoop.hbase.TagType; 073import org.apache.hadoop.hbase.client.Admin; 074import org.apache.hadoop.hbase.client.BufferedMutator; 075import org.apache.hadoop.hbase.client.BufferedMutatorParams; 076import org.apache.hadoop.hbase.client.ClusterConnection; 077import org.apache.hadoop.hbase.client.Connection; 078import org.apache.hadoop.hbase.client.ConnectionFactory; 079import org.apache.hadoop.hbase.client.Hbck; 080import org.apache.hadoop.hbase.client.Put; 081import org.apache.hadoop.hbase.client.RegionLocator; 082import org.apache.hadoop.hbase.client.Result; 083import org.apache.hadoop.hbase.client.ResultScanner; 084import org.apache.hadoop.hbase.client.Scan; 085import org.apache.hadoop.hbase.client.Table; 086import org.apache.hadoop.hbase.client.TableBuilder; 087import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 088import org.apache.hadoop.hbase.io.compress.Compression; 089import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 090import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 091import org.apache.hadoop.hbase.io.hfile.CacheConfig; 092import org.apache.hadoop.hbase.io.hfile.HFile; 093import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 094import org.apache.hadoop.hbase.io.hfile.HFileScanner; 095import org.apache.hadoop.hbase.regionserver.BloomType; 096import org.apache.hadoop.hbase.regionserver.HRegion; 097import org.apache.hadoop.hbase.regionserver.HStore; 098import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 099import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 100import org.apache.hadoop.hbase.security.User; 101import org.apache.hadoop.hbase.testclassification.LargeTests; 102import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 103import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 104import org.apache.hadoop.hbase.util.Bytes; 105import org.apache.hadoop.hbase.util.CommonFSUtils; 106import org.apache.hadoop.hbase.util.FSUtils; 107import org.apache.hadoop.hbase.util.ReflectionUtils; 108import org.apache.hadoop.hdfs.DistributedFileSystem; 109import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 110import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 111import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 112import org.apache.hadoop.io.NullWritable; 113import org.apache.hadoop.mapreduce.Job; 114import org.apache.hadoop.mapreduce.Mapper; 115import org.apache.hadoop.mapreduce.RecordWriter; 116import org.apache.hadoop.mapreduce.TaskAttemptContext; 117import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 118import org.junit.ClassRule; 119import org.junit.Ignore; 120import org.junit.Test; 121import org.junit.experimental.categories.Category; 122import org.mockito.Mockito; 123import org.slf4j.Logger; 124import org.slf4j.LoggerFactory; 125 126/** 127 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 128 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 129 * values like those of {@link PerformanceEvaluation}. 130 */ 131@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 132// TODO : Remove this in 3.0 133public class TestHFileOutputFormat2 { 134 135 @ClassRule 136 public static final HBaseClassTestRule CLASS_RULE = 137 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 138 139 private final static int ROWSPERSPLIT = 1024; 140 141 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 142 private static final byte[][] FAMILIES = 143 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 144 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 145 .map(TableName::valueOf).toArray(TableName[]::new); 146 147 private HBaseTestingUtility util = new HBaseTestingUtility(); 148 149 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 150 151 /** 152 * Simple mapper that makes KeyValue output. 153 */ 154 static class RandomKVGeneratingMapper 155 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 156 157 private int keyLength; 158 private static final int KEYLEN_DEFAULT = 10; 159 private static final String KEYLEN_CONF = "randomkv.key.length"; 160 161 private int valLength; 162 private static final int VALLEN_DEFAULT = 10; 163 private static final String VALLEN_CONF = "randomkv.val.length"; 164 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 165 private boolean multiTableMapper = false; 166 private TableName[] tables = null; 167 168 @Override 169 protected void setup(Context context) throws IOException, InterruptedException { 170 super.setup(context); 171 172 Configuration conf = context.getConfiguration(); 173 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 174 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 175 multiTableMapper = 176 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 177 if (multiTableMapper) { 178 tables = TABLE_NAMES; 179 } else { 180 tables = new TableName[] { TABLE_NAMES[0] }; 181 } 182 } 183 184 @Override 185 protected void map(NullWritable n1, NullWritable n2, 186 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 187 throws java.io.IOException, InterruptedException { 188 189 byte keyBytes[] = new byte[keyLength]; 190 byte valBytes[] = new byte[valLength]; 191 192 int taskId = context.getTaskAttemptID().getTaskID().getId(); 193 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 194 byte[] key; 195 for (int j = 0; j < tables.length; ++j) { 196 for (int i = 0; i < ROWSPERSPLIT; i++) { 197 Bytes.random(keyBytes); 198 // Ensure that unique tasks generate unique keys 199 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 200 Bytes.random(valBytes); 201 key = keyBytes; 202 if (multiTableMapper) { 203 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 204 } 205 206 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 207 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 208 context.write(new ImmutableBytesWritable(key), kv); 209 } 210 } 211 } 212 } 213 } 214 215 /** 216 * Simple mapper that makes Put output. 217 */ 218 static class RandomPutGeneratingMapper 219 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 220 221 private int keyLength; 222 private static final int KEYLEN_DEFAULT = 10; 223 private static final String KEYLEN_CONF = "randomkv.key.length"; 224 225 private int valLength; 226 private static final int VALLEN_DEFAULT = 10; 227 private static final String VALLEN_CONF = "randomkv.val.length"; 228 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 229 private boolean multiTableMapper = false; 230 private TableName[] tables = null; 231 232 @Override 233 protected void setup(Context context) throws IOException, InterruptedException { 234 super.setup(context); 235 236 Configuration conf = context.getConfiguration(); 237 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 238 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 239 multiTableMapper = 240 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 241 if (multiTableMapper) { 242 tables = TABLE_NAMES; 243 } else { 244 tables = new TableName[] { TABLE_NAMES[0] }; 245 } 246 } 247 248 @Override 249 protected void map(NullWritable n1, NullWritable n2, 250 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 251 throws java.io.IOException, InterruptedException { 252 253 byte keyBytes[] = new byte[keyLength]; 254 byte valBytes[] = new byte[valLength]; 255 256 int taskId = context.getTaskAttemptID().getTaskID().getId(); 257 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 258 259 byte[] key; 260 for (int j = 0; j < tables.length; ++j) { 261 for (int i = 0; i < ROWSPERSPLIT; i++) { 262 Bytes.random(keyBytes); 263 // Ensure that unique tasks generate unique keys 264 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 265 Bytes.random(valBytes); 266 key = keyBytes; 267 if (multiTableMapper) { 268 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 269 } 270 271 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 272 Put p = new Put(keyBytes); 273 p.addColumn(family, QUALIFIER, valBytes); 274 // set TTL to very low so that the scan does not return any value 275 p.setTTL(1l); 276 context.write(new ImmutableBytesWritable(key), p); 277 } 278 } 279 } 280 } 281 } 282 283 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 284 if (putSortReducer) { 285 job.setInputFormatClass(NMapInputFormat.class); 286 job.setMapperClass(RandomPutGeneratingMapper.class); 287 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 288 job.setMapOutputValueClass(Put.class); 289 } else { 290 job.setInputFormatClass(NMapInputFormat.class); 291 job.setMapperClass(RandomKVGeneratingMapper.class); 292 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 293 job.setMapOutputValueClass(KeyValue.class); 294 } 295 } 296 297 /** 298 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 299 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 300 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 301 */ 302 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 303 @Test 304 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 305 Configuration conf = new Configuration(this.util.getConfiguration()); 306 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 307 TaskAttemptContext context = null; 308 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 309 try { 310 Job job = new Job(conf); 311 FileOutputFormat.setOutputPath(job, dir); 312 context = createTestTaskAttemptContext(job); 313 HFileOutputFormat2 hof = new HFileOutputFormat2(); 314 writer = hof.getRecordWriter(context); 315 final byte[] b = Bytes.toBytes("b"); 316 317 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 318 // changed by call to write. Check all in kv is same but ts. 319 KeyValue kv = new KeyValue(b, b, b); 320 KeyValue original = kv.clone(); 321 writer.write(new ImmutableBytesWritable(), kv); 322 assertFalse(original.equals(kv)); 323 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 324 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 325 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 326 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 327 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 328 329 // Test 2. Now test passing a kv that has explicit ts. It should not be 330 // changed by call to record write. 331 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 332 original = kv.clone(); 333 writer.write(new ImmutableBytesWritable(), kv); 334 assertTrue(original.equals(kv)); 335 } finally { 336 if (writer != null && context != null) writer.close(context); 337 dir.getFileSystem(conf).delete(dir, true); 338 } 339 } 340 341 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 342 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 343 TaskAttemptContext context = 344 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 345 return context; 346 } 347 348 /* 349 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 350 * time-restricted scans. 351 */ 352 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 353 @Test 354 public void test_TIMERANGE() throws Exception { 355 Configuration conf = new Configuration(this.util.getConfiguration()); 356 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 357 TaskAttemptContext context = null; 358 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 359 LOG.info("Timerange dir writing to dir: " + dir); 360 try { 361 // build a record writer using HFileOutputFormat2 362 Job job = new Job(conf); 363 FileOutputFormat.setOutputPath(job, dir); 364 context = createTestTaskAttemptContext(job); 365 HFileOutputFormat2 hof = new HFileOutputFormat2(); 366 writer = hof.getRecordWriter(context); 367 368 // Pass two key values with explicit times stamps 369 final byte[] b = Bytes.toBytes("b"); 370 371 // value 1 with timestamp 2000 372 KeyValue kv = new KeyValue(b, b, b, 2000, b); 373 KeyValue original = kv.clone(); 374 writer.write(new ImmutableBytesWritable(), kv); 375 assertEquals(original, kv); 376 377 // value 2 with timestamp 1000 378 kv = new KeyValue(b, b, b, 1000, b); 379 original = kv.clone(); 380 writer.write(new ImmutableBytesWritable(), kv); 381 assertEquals(original, kv); 382 383 // verify that the file has the proper FileInfo. 384 writer.close(context); 385 386 // the generated file lives 1 directory down from the attempt directory 387 // and is the only file, e.g. 388 // _attempt__0000_r_000000_0/b/1979617994050536795 389 FileSystem fs = FileSystem.get(conf); 390 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 391 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 392 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 393 394 // open as HFile Reader and pull out TIMERANGE FileInfo. 395 HFile.Reader rd = 396 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 397 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 398 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 399 assertNotNull(range); 400 401 // unmarshall and check values. 402 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 403 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 404 assertEquals(1000, timeRangeTracker.getMin()); 405 assertEquals(2000, timeRangeTracker.getMax()); 406 rd.close(); 407 } finally { 408 if (writer != null && context != null) writer.close(context); 409 dir.getFileSystem(conf).delete(dir, true); 410 } 411 } 412 413 /** 414 * Run small MR job. 415 */ 416 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 417 @Test 418 public void testWritingPEData() throws Exception { 419 Configuration conf = util.getConfiguration(); 420 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 421 FileSystem fs = testDir.getFileSystem(conf); 422 423 // Set down this value or we OOME in eclipse. 424 conf.setInt("mapreduce.task.io.sort.mb", 20); 425 // Write a few files. 426 long hregionMaxFilesize = 10 * 1024; 427 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 428 429 Job job = new Job(conf, "testWritingPEData"); 430 setupRandomGeneratorMapper(job, false); 431 // This partitioner doesn't work well for number keys but using it anyways 432 // just to demonstrate how to configure it. 433 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 434 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 435 436 Arrays.fill(startKey, (byte) 0); 437 Arrays.fill(endKey, (byte) 0xff); 438 439 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 440 // Set start and end rows for partitioner. 441 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 442 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 443 job.setReducerClass(KeyValueSortReducer.class); 444 job.setOutputFormatClass(HFileOutputFormat2.class); 445 job.setNumReduceTasks(4); 446 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 447 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 448 KeyValueSerialization.class.getName()); 449 450 FileOutputFormat.setOutputPath(job, testDir); 451 assertTrue(job.waitForCompletion(false)); 452 FileStatus[] files = fs.listStatus(testDir); 453 assertTrue(files.length > 0); 454 455 // check output file num and size. 456 for (byte[] family : FAMILIES) { 457 long kvCount = 0; 458 RemoteIterator<LocatedFileStatus> iterator = 459 fs.listFiles(testDir.suffix("/" + new String(family)), true); 460 while (iterator.hasNext()) { 461 LocatedFileStatus keyFileStatus = iterator.next(); 462 HFile.Reader reader = 463 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 464 HFileScanner scanner = reader.getScanner(conf, false, false, false); 465 466 kvCount += reader.getEntries(); 467 scanner.seekTo(); 468 long perKVSize = scanner.getCell().getSerializedSize(); 469 assertTrue("Data size of each file should not be too large.", 470 perKVSize * reader.getEntries() <= hregionMaxFilesize); 471 } 472 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 473 } 474 } 475 476 /** 477 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 478 */ 479 @Test 480 public void test_WritingTagData() throws Exception { 481 Configuration conf = new Configuration(this.util.getConfiguration()); 482 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 483 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 484 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 485 TaskAttemptContext context = null; 486 Path dir = util.getDataTestDir("WritingTagData"); 487 try { 488 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 489 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 490 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 491 Job job = new Job(conf); 492 FileOutputFormat.setOutputPath(job, dir); 493 context = createTestTaskAttemptContext(job); 494 HFileOutputFormat2 hof = new HFileOutputFormat2(); 495 writer = hof.getRecordWriter(context); 496 final byte[] b = Bytes.toBytes("b"); 497 498 List<Tag> tags = new ArrayList<>(); 499 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 500 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 501 writer.write(new ImmutableBytesWritable(), kv); 502 writer.close(context); 503 writer = null; 504 FileSystem fs = dir.getFileSystem(conf); 505 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 506 while (iterator.hasNext()) { 507 LocatedFileStatus keyFileStatus = iterator.next(); 508 HFile.Reader reader = 509 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 510 HFileScanner scanner = reader.getScanner(conf, false, false, false); 511 scanner.seekTo(); 512 Cell cell = scanner.getCell(); 513 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 514 assertTrue(tagsFromCell.size() > 0); 515 for (Tag tag : tagsFromCell) { 516 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 517 } 518 } 519 } finally { 520 if (writer != null && context != null) writer.close(context); 521 dir.getFileSystem(conf).delete(dir, true); 522 } 523 } 524 525 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 526 @Test 527 public void testJobConfiguration() throws Exception { 528 Configuration conf = new Configuration(this.util.getConfiguration()); 529 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 530 util.getDataTestDir("testJobConfiguration").toString()); 531 Job job = new Job(conf); 532 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 533 Table table = Mockito.mock(Table.class); 534 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 535 setupMockStartKeys(regionLocator); 536 setupMockTableName(regionLocator); 537 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 538 assertEquals(job.getNumReduceTasks(), 4); 539 } 540 541 private byte[][] generateRandomStartKeys(int numKeys) { 542 Random random = ThreadLocalRandom.current(); 543 byte[][] ret = new byte[numKeys][]; 544 // first region start key is always empty 545 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 546 for (int i = 1; i < numKeys; i++) { 547 ret[i] = 548 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 549 } 550 return ret; 551 } 552 553 private byte[][] generateRandomSplitKeys(int numKeys) { 554 Random random = ThreadLocalRandom.current(); 555 byte[][] ret = new byte[numKeys][]; 556 for (int i = 0; i < numKeys; i++) { 557 ret[i] = 558 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH); 559 } 560 return ret; 561 } 562 563 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 564 @Test 565 public void testMRIncrementalLoad() throws Exception { 566 LOG.info("\nStarting test testMRIncrementalLoad\n"); 567 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 568 } 569 570 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 571 @Test 572 public void testMRIncrementalLoadWithSplit() throws Exception { 573 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 574 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 575 } 576 577 /** 578 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 579 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 580 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 581 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 582 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 583 * locality features more easily. 584 */ 585 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 586 @Test 587 public void testMRIncrementalLoadWithLocality() throws Exception { 588 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 589 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 590 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 591 } 592 593 // @Ignore("Wahtevs") 594 @Test 595 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 596 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 597 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 598 } 599 600 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 601 boolean putSortReducer, String tableStr) throws Exception { 602 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false, 603 Arrays.asList(tableStr)); 604 } 605 606 @Test 607 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 608 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 609 doIncrementalLoadTest(false, false, true, false, 610 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 611 } 612 613 @Test 614 public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception { 615 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n"); 616 doIncrementalLoadTest(false, false, true, true, 617 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 618 } 619 620 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 621 boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr) 622 throws Exception { 623 util = new HBaseTestingUtility(); 624 Configuration conf = util.getConfiguration(); 625 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 626 if (shouldWriteToTableWithNamespace) { 627 conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); 628 } 629 int hostCount = 1; 630 int regionNum = 5; 631 if (shouldKeepLocality) { 632 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 633 // explicit hostnames parameter just like MiniDFSCluster does. 634 hostCount = 3; 635 regionNum = 20; 636 } 637 638 String[] hostnames = new String[hostCount]; 639 for (int i = 0; i < hostCount; ++i) { 640 hostnames[i] = "datanode_" + i; 641 } 642 StartMiniClusterOption option = 643 StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 644 util.startMiniCluster(option); 645 646 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 647 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 648 boolean writeMultipleTables = tableStr.size() > 1; 649 for (String tableStrSingle : tableStr) { 650 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 651 TableName tableName = TableName.valueOf(tableStrSingle); 652 Table table = util.createTable(tableName, FAMILIES, splitKeys); 653 654 RegionLocator r = util.getConnection().getRegionLocator(tableName); 655 assertEquals("Should start with empty table", 0, util.countRows(table)); 656 int numRegions = r.getStartKeys().length; 657 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 658 659 allTables.put(tableStrSingle, table); 660 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r)); 661 } 662 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 663 // Generate the bulk load files 664 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 665 if (shouldWriteToTableWithNamespace) { 666 testDir = new Path(testDir, "default"); 667 } 668 669 for (Table tableSingle : allTables.values()) { 670 // This doesn't write into the table, just makes files 671 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 672 } 673 int numTableDirs = 0; 674 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 675 for (FileStatus tf : fss) { 676 Path tablePath = testDir; 677 if (writeMultipleTables) { 678 if (allTables.containsKey(tf.getPath().getName())) { 679 ++numTableDirs; 680 tablePath = tf.getPath(); 681 } else { 682 continue; 683 } 684 } 685 686 // Make sure that a directory was created for every CF 687 int dir = 0; 688 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 689 for (FileStatus f : fss) { 690 for (byte[] family : FAMILIES) { 691 if (Bytes.toString(family).equals(f.getPath().getName())) { 692 ++dir; 693 } 694 } 695 } 696 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 697 } 698 if (writeMultipleTables) { 699 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 700 } 701 702 Admin admin = util.getConnection().getAdmin(); 703 try { 704 // handle the split case 705 if (shouldChangeRegions) { 706 Table chosenTable = allTables.values().iterator().next(); 707 // Choose a semi-random table if multiple tables are available 708 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 709 admin.disableTable(chosenTable.getName()); 710 util.waitUntilNoRegionsInTransition(); 711 712 util.deleteTable(chosenTable.getName()); 713 byte[][] newSplitKeys = generateRandomSplitKeys(14); 714 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 715 716 while ( 717 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 718 .size() != 15 || !admin.isTableAvailable(table.getName()) 719 ) { 720 Thread.sleep(200); 721 LOG.info("Waiting for new region assignment to happen"); 722 } 723 } 724 725 // Perform the actual load 726 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 727 Path tableDir = testDir; 728 String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString(); 729 LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr); 730 if (writeMultipleTables) { 731 tableDir = new Path(testDir, tableNameStr); 732 } 733 Table currentTable = allTables.get(tableNameStr); 734 TableName currentTableName = currentTable.getName(); 735 new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, 736 singleTableInfo.getRegionLocator()); 737 738 // Ensure data shows up 739 int expectedRows = 0; 740 if (putSortReducer) { 741 // no rows should be extracted 742 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 743 util.countRows(currentTable)); 744 } else { 745 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 746 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 747 util.countRows(currentTable)); 748 Scan scan = new Scan(); 749 ResultScanner results = currentTable.getScanner(scan); 750 for (Result res : results) { 751 assertEquals(FAMILIES.length, res.rawCells().length); 752 Cell first = res.rawCells()[0]; 753 for (Cell kv : res.rawCells()) { 754 assertTrue(CellUtil.matchingRows(first, kv)); 755 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 756 } 757 } 758 results.close(); 759 } 760 String tableDigestBefore = util.checksumRows(currentTable); 761 // Check region locality 762 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 763 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 764 hbd.add(region.getHDFSBlocksDistribution()); 765 } 766 for (String hostname : hostnames) { 767 float locality = hbd.getBlockLocalityIndex(hostname); 768 LOG.info("locality of [" + hostname + "]: " + locality); 769 assertEquals(100, (int) (locality * 100)); 770 } 771 772 // Cause regions to reopen 773 admin.disableTable(currentTableName); 774 while (!admin.isTableDisabled(currentTableName)) { 775 Thread.sleep(200); 776 LOG.info("Waiting for table to disable"); 777 } 778 admin.enableTable(currentTableName); 779 util.waitTableAvailable(currentTableName); 780 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 781 util.checksumRows(currentTable)); 782 } 783 } finally { 784 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 785 tableInfoSingle.getRegionLocator().close(); 786 } 787 for (Entry<String, Table> singleTable : allTables.entrySet()) { 788 singleTable.getValue().close(); 789 util.deleteTable(singleTable.getValue().getName()); 790 } 791 testDir.getFileSystem(conf).delete(testDir, true); 792 util.shutdownMiniCluster(); 793 } 794 } 795 796 private void runIncrementalPELoad(Configuration conf, 797 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 798 throws IOException, InterruptedException, ClassNotFoundException { 799 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 800 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 801 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 802 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 803 KeyValueSerialization.class.getName()); 804 setupRandomGeneratorMapper(job, putSortReducer); 805 if (tableInfo.size() > 1) { 806 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 807 int sum = 0; 808 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 809 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 810 } 811 assertEquals(sum, job.getNumReduceTasks()); 812 } else { 813 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 814 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(), 815 regionLocator); 816 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 817 } 818 819 FileOutputFormat.setOutputPath(job, outDir); 820 821 assertFalse(util.getTestFileSystem().exists(outDir)); 822 823 assertTrue(job.waitForCompletion(true)); 824 } 825 826 /** 827 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 828 * family compression map is correctly serialized into and deserialized from configuration 829 */ 830 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 831 @Test 832 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 833 for (int numCfs = 0; numCfs <= 3; numCfs++) { 834 Configuration conf = new Configuration(this.util.getConfiguration()); 835 Map<String, Compression.Algorithm> familyToCompression = 836 getMockColumnFamiliesForCompression(numCfs); 837 Table table = Mockito.mock(Table.class); 838 setupMockColumnFamiliesForCompression(table, familyToCompression); 839 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 840 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 841 Arrays.asList(table.getTableDescriptor()))); 842 843 // read back family specific compression setting from the configuration 844 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 845 HFileOutputFormat2.createFamilyCompressionMap(conf); 846 847 // test that we have a value for all column families that matches with the 848 // used mock values 849 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 850 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 851 entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8"))); 852 } 853 } 854 } 855 856 private void setupMockColumnFamiliesForCompression(Table table, 857 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 858 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 859 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 860 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 861 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 862 } 863 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 864 } 865 866 /** 867 * @return a map from column family names to compression algorithms for testing column family 868 * compression. Column family names have special characters 869 */ 870 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 871 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 872 // use column family names having special characters 873 if (numCfs-- > 0) { 874 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 875 } 876 if (numCfs-- > 0) { 877 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 878 } 879 if (numCfs-- > 0) { 880 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 881 } 882 if (numCfs-- > 0) { 883 familyToCompression.put("Family3", Compression.Algorithm.NONE); 884 } 885 return familyToCompression; 886 } 887 888 /** 889 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 890 * family bloom type map is correctly serialized into and deserialized from configuration 891 */ 892 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 893 @Test 894 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 895 for (int numCfs = 0; numCfs <= 2; numCfs++) { 896 Configuration conf = new Configuration(this.util.getConfiguration()); 897 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 898 Table table = Mockito.mock(Table.class); 899 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 900 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 901 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 902 Arrays.asList(table.getTableDescriptor()))); 903 904 // read back family specific data block encoding settings from the 905 // configuration 906 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 907 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 908 909 // test that we have a value for all column families that matches with the 910 // used mock values 911 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 912 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 913 entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8"))); 914 } 915 } 916 } 917 918 private void setupMockColumnFamiliesForBloomType(Table table, 919 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 920 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 921 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 922 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 923 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 924 } 925 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 926 } 927 928 /** 929 * @return a map from column family names to compression algorithms for testing column family 930 * compression. Column family names have special characters 931 */ 932 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 933 Map<String, BloomType> familyToBloomType = new HashMap<>(); 934 // use column family names having special characters 935 if (numCfs-- > 0) { 936 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 937 } 938 if (numCfs-- > 0) { 939 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 940 } 941 if (numCfs-- > 0) { 942 familyToBloomType.put("Family3", BloomType.NONE); 943 } 944 return familyToBloomType; 945 } 946 947 /** 948 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 949 * family block size map is correctly serialized into and deserialized from configuration 950 */ 951 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 952 @Test 953 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 954 for (int numCfs = 0; numCfs <= 3; numCfs++) { 955 Configuration conf = new Configuration(this.util.getConfiguration()); 956 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 957 Table table = Mockito.mock(Table.class); 958 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 959 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 960 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 961 Arrays.asList(table.getTableDescriptor()))); 962 963 // read back family specific data block encoding settings from the 964 // configuration 965 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 966 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 967 968 // test that we have a value for all column families that matches with the 969 // used mock values 970 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 971 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 972 entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8"))); 973 } 974 } 975 } 976 977 private void setupMockColumnFamiliesForBlockSize(Table table, 978 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 979 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 980 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 981 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 982 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 983 } 984 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 985 } 986 987 /** 988 * @return a map from column family names to compression algorithms for testing column family 989 * compression. Column family names have special characters 990 */ 991 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 992 Map<String, Integer> familyToBlockSize = new HashMap<>(); 993 // use column family names having special characters 994 if (numCfs-- > 0) { 995 familyToBlockSize.put("Family1!@#!@#&", 1234); 996 } 997 if (numCfs-- > 0) { 998 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 999 } 1000 if (numCfs-- > 0) { 1001 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1002 } 1003 if (numCfs-- > 0) { 1004 familyToBlockSize.put("Family3", 0); 1005 } 1006 return familyToBlockSize; 1007 } 1008 1009 /** 1010 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 1011 * the family data block encoding map is correctly serialized into and deserialized from 1012 * configuration 1013 */ 1014 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1015 @Test 1016 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1017 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1018 Configuration conf = new Configuration(this.util.getConfiguration()); 1019 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1020 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1021 Table table = Mockito.mock(Table.class); 1022 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1023 HTableDescriptor tableDescriptor = table.getTableDescriptor(); 1024 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1025 HFileOutputFormat2.serializeColumnFamilyAttribute( 1026 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1027 1028 // read back family specific data block encoding settings from the 1029 // configuration 1030 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1031 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1032 1033 // test that we have a value for all column families that matches with the 1034 // used mock values 1035 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1036 assertEquals( 1037 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1038 entry.getValue(), 1039 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8"))); 1040 } 1041 } 1042 } 1043 1044 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1045 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1046 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]); 1047 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1048 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1) 1049 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)); 1050 } 1051 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); 1052 } 1053 1054 /** 1055 * @return a map from column family names to compression algorithms for testing column family 1056 * compression. Column family names have special characters 1057 */ 1058 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1059 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1060 // use column family names having special characters 1061 if (numCfs-- > 0) { 1062 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1063 } 1064 if (numCfs-- > 0) { 1065 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1066 } 1067 if (numCfs-- > 0) { 1068 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1069 } 1070 if (numCfs-- > 0) { 1071 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1072 } 1073 return familyToDataBlockEncoding; 1074 } 1075 1076 private void setupMockStartKeys(RegionLocator table) throws IOException { 1077 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1078 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1079 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1080 } 1081 1082 private void setupMockTableName(RegionLocator table) throws IOException { 1083 TableName mockTableName = TableName.valueOf("mock_table"); 1084 Mockito.doReturn(mockTableName).when(table).getName(); 1085 } 1086 1087 /** 1088 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1089 * from the column family descriptor 1090 */ 1091 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1092 @Test 1093 public void testColumnFamilySettings() throws Exception { 1094 Configuration conf = new Configuration(this.util.getConfiguration()); 1095 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1096 TaskAttemptContext context = null; 1097 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1098 1099 // Setup table descriptor 1100 Table table = Mockito.mock(Table.class); 1101 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1102 HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]); 1103 Mockito.doReturn(htd).when(table).getTableDescriptor(); 1104 for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) { 1105 htd.addFamily(hcd); 1106 } 1107 1108 // set up the table to return some mock keys 1109 setupMockStartKeys(regionLocator); 1110 1111 try { 1112 // partial map red setup to get an operational writer for testing 1113 // We turn off the sequence file compression, because DefaultCodec 1114 // pollutes the GZip codec pool with an incompatible compressor. 1115 conf.set("io.seqfile.compression.type", "NONE"); 1116 conf.set("hbase.fs.tmp.dir", dir.toString()); 1117 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1118 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1119 1120 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1121 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1122 setupRandomGeneratorMapper(job, false); 1123 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); 1124 FileOutputFormat.setOutputPath(job, dir); 1125 context = createTestTaskAttemptContext(job); 1126 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1127 writer = hof.getRecordWriter(context); 1128 1129 // write out random rows 1130 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); 1131 writer.close(context); 1132 1133 // Make sure that a directory was created for every CF 1134 FileSystem fs = dir.getFileSystem(conf); 1135 1136 // commit so that the filesystem has one directory per column family 1137 hof.getOutputCommitter(context).commitTask(context); 1138 hof.getOutputCommitter(context).commitJob(context); 1139 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1140 assertEquals(htd.getFamilies().size(), families.length); 1141 for (FileStatus f : families) { 1142 String familyStr = f.getPath().getName(); 1143 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); 1144 // verify that the compression on this file matches the configured 1145 // compression 1146 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1147 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1148 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1149 1150 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1151 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1152 assertEquals( 1153 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1154 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1155 assertEquals( 1156 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1157 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1158 } 1159 } finally { 1160 dir.getFileSystem(conf).delete(dir, true); 1161 } 1162 } 1163 1164 /** 1165 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1166 * family descriptors 1167 */ 1168 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1169 TaskAttemptContext context, Set<byte[]> families, int numRows) 1170 throws IOException, InterruptedException { 1171 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1172 int valLength = 10; 1173 byte valBytes[] = new byte[valLength]; 1174 1175 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1176 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1177 final byte[] qualifier = Bytes.toBytes("data"); 1178 for (int i = 0; i < numRows; i++) { 1179 Bytes.putInt(keyBytes, 0, i); 1180 Bytes.random(valBytes); 1181 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1182 for (byte[] family : families) { 1183 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1184 writer.write(key, kv); 1185 } 1186 } 1187 } 1188 1189 /** 1190 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1191 * excluded from minor compaction. Without the fix of HBASE-6901, an 1192 * ArrayIndexOutOfBoundsException will be thrown. 1193 */ 1194 @Ignore("Flakey: See HBASE-9051") 1195 @Test 1196 public void testExcludeAllFromMinorCompaction() throws Exception { 1197 Configuration conf = util.getConfiguration(); 1198 conf.setInt("hbase.hstore.compaction.min", 2); 1199 generateRandomStartKeys(5); 1200 1201 util.startMiniCluster(); 1202 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1203 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1204 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1205 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1206 assertEquals("Should start with empty table", 0, util.countRows(table)); 1207 1208 // deep inspection: get the StoreFile dir 1209 final Path storePath = 1210 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1211 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1212 Bytes.toString(FAMILIES[0]))); 1213 assertEquals(0, fs.listStatus(storePath).length); 1214 1215 // Generate two bulk load files 1216 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1217 1218 for (int i = 0; i < 2; i++) { 1219 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1220 runIncrementalPELoad(conf, 1221 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), 1222 conn.getRegionLocator(TABLE_NAMES[0]))), 1223 testDir, false); 1224 // Perform the actual load 1225 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator); 1226 } 1227 1228 // Ensure data shows up 1229 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1230 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, 1231 util.countRows(table)); 1232 1233 // should have a second StoreFile now 1234 assertEquals(2, fs.listStatus(storePath).length); 1235 1236 // minor compactions shouldn't get rid of the file 1237 admin.compact(TABLE_NAMES[0]); 1238 try { 1239 quickPoll(new Callable<Boolean>() { 1240 @Override 1241 public Boolean call() throws Exception { 1242 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1243 for (HRegion region : regions) { 1244 for (HStore store : region.getStores()) { 1245 store.closeAndArchiveCompactedFiles(); 1246 } 1247 } 1248 return fs.listStatus(storePath).length == 1; 1249 } 1250 }, 5000); 1251 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1252 } catch (AssertionError ae) { 1253 // this is expected behavior 1254 } 1255 1256 // a major compaction should work though 1257 admin.majorCompact(TABLE_NAMES[0]); 1258 quickPoll(new Callable<Boolean>() { 1259 @Override 1260 public Boolean call() throws Exception { 1261 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1262 for (HRegion region : regions) { 1263 for (HStore store : region.getStores()) { 1264 store.closeAndArchiveCompactedFiles(); 1265 } 1266 } 1267 return fs.listStatus(storePath).length == 1; 1268 } 1269 }, 5000); 1270 1271 } finally { 1272 util.shutdownMiniCluster(); 1273 } 1274 } 1275 1276 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1277 @Test 1278 public void testExcludeMinorCompaction() throws Exception { 1279 Configuration conf = util.getConfiguration(); 1280 conf.setInt("hbase.hstore.compaction.min", 2); 1281 generateRandomStartKeys(5); 1282 1283 util.startMiniCluster(); 1284 try (Connection conn = ConnectionFactory.createConnection(conf); 1285 Admin admin = conn.getAdmin()) { 1286 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1287 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1288 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1289 assertEquals("Should start with empty table", 0, util.countRows(table)); 1290 1291 // deep inspection: get the StoreFile dir 1292 final Path storePath = 1293 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1294 new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1295 Bytes.toString(FAMILIES[0]))); 1296 assertEquals(0, fs.listStatus(storePath).length); 1297 1298 // put some data in it and flush to create a storefile 1299 Put p = new Put(Bytes.toBytes("test")); 1300 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1301 table.put(p); 1302 admin.flush(TABLE_NAMES[0]); 1303 assertEquals(1, util.countRows(table)); 1304 quickPoll(new Callable<Boolean>() { 1305 @Override 1306 public Boolean call() throws Exception { 1307 return fs.listStatus(storePath).length == 1; 1308 } 1309 }, 5000); 1310 1311 // Generate a bulk load file with more rows 1312 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1313 1314 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1315 runIncrementalPELoad(conf, 1316 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)), 1317 testDir, false); 1318 1319 // Perform the actual load 1320 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator); 1321 1322 // Ensure data shows up 1323 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1324 assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1, 1325 util.countRows(table)); 1326 1327 // should have a second StoreFile now 1328 assertEquals(2, fs.listStatus(storePath).length); 1329 1330 // minor compactions shouldn't get rid of the file 1331 admin.compact(TABLE_NAMES[0]); 1332 try { 1333 quickPoll(new Callable<Boolean>() { 1334 @Override 1335 public Boolean call() throws Exception { 1336 return fs.listStatus(storePath).length == 1; 1337 } 1338 }, 5000); 1339 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1340 } catch (AssertionError ae) { 1341 // this is expected behavior 1342 } 1343 1344 // a major compaction should work though 1345 admin.majorCompact(TABLE_NAMES[0]); 1346 quickPoll(new Callable<Boolean>() { 1347 @Override 1348 public Boolean call() throws Exception { 1349 return fs.listStatus(storePath).length == 1; 1350 } 1351 }, 5000); 1352 1353 } finally { 1354 util.shutdownMiniCluster(); 1355 } 1356 } 1357 1358 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1359 int sleepMs = 10; 1360 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1361 while (retries-- > 0) { 1362 if (c.call().booleanValue()) { 1363 return; 1364 } 1365 Thread.sleep(sleepMs); 1366 } 1367 fail(); 1368 } 1369 1370 public static void main(String args[]) throws Exception { 1371 new TestHFileOutputFormat2().manualTest(args); 1372 } 1373 1374 public void manualTest(String args[]) throws Exception { 1375 Configuration conf = HBaseConfiguration.create(); 1376 util = new HBaseTestingUtility(conf); 1377 if ("newtable".equals(args[0])) { 1378 TableName tname = TableName.valueOf(args[1]); 1379 byte[][] splitKeys = generateRandomSplitKeys(4); 1380 Table table = util.createTable(tname, FAMILIES, splitKeys); 1381 } else if ("incremental".equals(args[0])) { 1382 TableName tname = TableName.valueOf(args[1]); 1383 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1384 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1385 Path outDir = new Path("incremental-out"); 1386 runIncrementalPELoad(conf, 1387 Arrays.asList( 1388 new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)), 1389 outDir, false); 1390 } 1391 } else { 1392 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1393 } 1394 } 1395 1396 @Test 1397 public void testBlockStoragePolicy() throws Exception { 1398 util = new HBaseTestingUtility(); 1399 Configuration conf = util.getConfiguration(); 1400 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1401 1402 conf.set( 1403 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1404 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1405 "ONE_SSD"); 1406 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1407 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1408 util.startMiniDFSCluster(3); 1409 FileSystem fs = util.getDFSCluster().getFileSystem(); 1410 try { 1411 fs.mkdirs(cf1Dir); 1412 fs.mkdirs(cf2Dir); 1413 1414 // the original block storage policy would be HOT 1415 String spA = getStoragePolicyName(fs, cf1Dir); 1416 String spB = getStoragePolicyName(fs, cf2Dir); 1417 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1418 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1419 assertEquals("HOT", spA); 1420 assertEquals("HOT", spB); 1421 1422 // alter table cf schema to change storage policies 1423 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1424 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1425 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1426 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1427 spA = getStoragePolicyName(fs, cf1Dir); 1428 spB = getStoragePolicyName(fs, cf2Dir); 1429 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1430 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1431 assertNotNull(spA); 1432 assertEquals("ONE_SSD", spA); 1433 assertNotNull(spB); 1434 assertEquals("ALL_SSD", spB); 1435 } finally { 1436 fs.delete(cf1Dir, true); 1437 fs.delete(cf2Dir, true); 1438 util.shutdownMiniDFSCluster(); 1439 } 1440 } 1441 1442 private String getStoragePolicyName(FileSystem fs, Path path) { 1443 try { 1444 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1445 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1446 } catch (Exception e) { 1447 // Maybe fail because of using old HDFS version, try the old way 1448 if (LOG.isTraceEnabled()) { 1449 LOG.trace("Failed to get policy directly", e); 1450 } 1451 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1452 return policy == null ? "HOT" : policy;// HOT by default 1453 } 1454 } 1455 1456 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1457 try { 1458 if (fs instanceof DistributedFileSystem) { 1459 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1460 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1461 if (null != status) { 1462 byte storagePolicyId = status.getStoragePolicy(); 1463 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1464 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1465 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1466 for (BlockStoragePolicy policy : policies) { 1467 if (policy.getId() == storagePolicyId) { 1468 return policy.getName(); 1469 } 1470 } 1471 } 1472 } 1473 } 1474 } catch (Throwable e) { 1475 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1476 } 1477 1478 return null; 1479 } 1480 1481 @Test 1482 public void TestConfigureCompression() throws Exception { 1483 Configuration conf = new Configuration(this.util.getConfiguration()); 1484 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1485 TaskAttemptContext context = null; 1486 Path dir = util.getDataTestDir("TestConfigureCompression"); 1487 String hfileoutputformatCompression = "gz"; 1488 1489 try { 1490 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1491 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1492 1493 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1494 1495 Job job = Job.getInstance(conf); 1496 FileOutputFormat.setOutputPath(job, dir); 1497 context = createTestTaskAttemptContext(job); 1498 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1499 writer = hof.getRecordWriter(context); 1500 final byte[] b = Bytes.toBytes("b"); 1501 1502 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1503 writer.write(new ImmutableBytesWritable(), kv); 1504 writer.close(context); 1505 writer = null; 1506 FileSystem fs = dir.getFileSystem(conf); 1507 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1508 while (iterator.hasNext()) { 1509 LocatedFileStatus keyFileStatus = iterator.next(); 1510 HFile.Reader reader = 1511 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1512 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1513 hfileoutputformatCompression); 1514 } 1515 } finally { 1516 if (writer != null && context != null) { 1517 writer.close(context); 1518 } 1519 dir.getFileSystem(conf).delete(dir, true); 1520 } 1521 1522 } 1523 1524 @Test 1525 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1526 // Start cluster A 1527 util = new HBaseTestingUtility(); 1528 Configuration confA = util.getConfiguration(); 1529 int hostCount = 3; 1530 int regionNum = 20; 1531 String[] hostnames = new String[hostCount]; 1532 for (int i = 0; i < hostCount; ++i) { 1533 hostnames[i] = "datanode_" + i; 1534 } 1535 StartMiniClusterOption option = 1536 StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1537 util.startMiniCluster(option); 1538 1539 // Start cluster B 1540 HBaseTestingUtility utilB = new HBaseTestingUtility(); 1541 Configuration confB = utilB.getConfiguration(); 1542 utilB.startMiniCluster(option); 1543 1544 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1545 1546 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1547 TableName tableName = TableName.valueOf("table"); 1548 // Create table in cluster B 1549 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1550 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1551 // Generate the bulk load files 1552 // Job has zookeeper configuration for cluster A 1553 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1554 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1555 Configuration jobConf = job.getConfiguration(); 1556 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1557 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1558 setupRandomGeneratorMapper(job, false); 1559 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1560 1561 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1562 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1563 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1564 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1565 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1566 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1567 1568 String bSpecificConfigKey = "my.override.config.for.b"; 1569 String bSpecificConfigValue = "b-specific-value"; 1570 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1571 bSpecificConfigValue); 1572 1573 FileOutputFormat.setOutputPath(job, testDir); 1574 1575 assertFalse(util.getTestFileSystem().exists(testDir)); 1576 1577 assertTrue(job.waitForCompletion(true)); 1578 1579 final List<Configuration> configs = 1580 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1581 1582 assertFalse(configs.isEmpty()); 1583 for (Configuration config : configs) { 1584 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1585 config.get(HConstants.ZOOKEEPER_QUORUM)); 1586 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1587 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1588 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1589 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1590 1591 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1592 } 1593 } finally { 1594 utilB.deleteTable(tableName); 1595 testDir.getFileSystem(confA).delete(testDir, true); 1596 util.shutdownMiniCluster(); 1597 utilB.shutdownMiniCluster(); 1598 } 1599 } 1600 1601 private static class ConfigurationCaptorConnection implements Connection { 1602 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1603 1604 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1605 1606 private final Connection delegate; 1607 1608 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1609 Map<String, byte[]> connectionAttributes) throws IOException { 1610 Configuration confForDelegate = new Configuration(conf); 1611 confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL); 1612 delegate = createConnection(confForDelegate, es, user, connectionAttributes); 1613 final String uuid = conf.get(UUID_KEY); 1614 if (uuid != null) { 1615 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1616 } 1617 } 1618 1619 static UUID configureConnectionImpl(Configuration conf) { 1620 conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 1621 ConfigurationCaptorConnection.class, Connection.class); 1622 1623 final UUID uuid = UUID.randomUUID(); 1624 conf.set(UUID_KEY, uuid.toString()); 1625 return uuid; 1626 } 1627 1628 static List<Configuration> getCapturedConfigarutions(UUID key) { 1629 return confs.get(key); 1630 } 1631 1632 @Override 1633 public Configuration getConfiguration() { 1634 return delegate.getConfiguration(); 1635 } 1636 1637 @Override 1638 public Table getTable(TableName tableName) throws IOException { 1639 return delegate.getTable(tableName); 1640 } 1641 1642 @Override 1643 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1644 return delegate.getTable(tableName, pool); 1645 } 1646 1647 @Override 1648 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1649 return delegate.getBufferedMutator(tableName); 1650 } 1651 1652 @Override 1653 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1654 return delegate.getBufferedMutator(params); 1655 } 1656 1657 @Override 1658 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1659 return delegate.getRegionLocator(tableName); 1660 } 1661 1662 @Override 1663 public void clearRegionLocationCache() { 1664 delegate.clearRegionLocationCache(); 1665 } 1666 1667 @Override 1668 public Admin getAdmin() throws IOException { 1669 return delegate.getAdmin(); 1670 } 1671 1672 @Override 1673 public void close() throws IOException { 1674 delegate.close(); 1675 } 1676 1677 @Override 1678 public boolean isClosed() { 1679 return delegate.isClosed(); 1680 } 1681 1682 @Override 1683 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1684 return delegate.getTableBuilder(tableName, pool); 1685 } 1686 1687 @Override 1688 public Hbck getHbck() throws IOException { 1689 return delegate.getHbck(); 1690 } 1691 1692 @Override 1693 public Hbck getHbck(ServerName masterServer) throws IOException { 1694 return delegate.getHbck(masterServer); 1695 } 1696 1697 @Override 1698 public void abort(String why, Throwable e) { 1699 delegate.abort(why, e); 1700 } 1701 1702 @Override 1703 public boolean isAborted() { 1704 return delegate.isAborted(); 1705 } 1706 1707 @Override 1708 public String getClusterId() { 1709 return delegate.getClusterId(); 1710 } 1711 } 1712}